MOON
Server: Apache
System: Linux server30c.hostingraja.org 3.10.0-962.3.2.lve1.5.63.el7.x86_64 #1 SMP Fri Oct 8 12:03:35 UTC 2021 x86_64
User: jibhires (1887)
PHP: 8.1.30
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open, allow_url_fopen, symlink, escapeshellcmd, pcntl_exec
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/internals/core/ipset/__init__.py
import abc
from collections import namedtuple
from typing import Dict, FrozenSet, Iterable, List, Set

from im360.utils.validate import IP, IPVersion

from .base import IPSetAtomicRestoreBase
from .libipset import IPSetRestoreCmd

IP_SET_PREFIX = "i360"


IPSetCount = namedtuple("IPSetCount", ["name", "db_count", "ipset_count"])


def get_ipset_family(ip_version: IPVersion):
    assert ip_version in (
        IP.V4,
        IP.V6,
    ), f"ip version {ip_version} is incorrect"
    return "inet6" if ip_version == IP.V6 else "inet"


class IPSetCollectionResetMixin(abc.ABC):
    @abc.abstractmethod
    def get_all_ipset_instances(
        self, ip_version: IPVersion
    ) -> List[IPSetAtomicRestoreBase]:
        pass

    async def reset(self, ip_version: IPVersion, existing: Set[str]):
        for ip_set in self.get_all_ipset_instances(ip_version):
            if ip_set.gen_ipset_name_for_ip_version(ip_version) in existing:
                await ip_set.reset(ip_version)


class AbstractIPSet(IPSetCollectionResetMixin, abc.ABC):
    """Entity to manage a specific slice of iptables rules & ipsets.

    See ..RuleSet
    """

    @abc.abstractmethod
    def get_all_ipsets(self, ip_version: IPVersion) -> FrozenSet[str]:
        pass

    @abc.abstractmethod
    def get_rules(self, ip_version: IPVersion, **kwargs) -> Iterable[dict]:
        pass

    @abc.abstractmethod
    async def restore(self, ip_version: IPVersion) -> None:
        pass

    @abc.abstractmethod
    def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]:
        pass

    def gen_ipset_destroy_ops(
        self, ip_version: IPVersion, existing: Set[str]
    ) -> Dict[str, IPSetRestoreCmd]:
        """Generate specific destroy `ipset restore` commands.

        Return None if no preference.
        """
        return {}

    @abc.abstractmethod
    async def get_ipsets_count(
        self, ip_version: IPVersion
    ) -> List[IPSetCount]:  # pragma: no cover
        pass