File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys/shared_disabled_rules.py
from asyncio import AbstractEventLoop, Event
from collections.abc import Callable
from logging import getLogger
from pathlib import Path
from defence360agent.utils import recurring_check
from imav.malwarelib.subsys.ainotify import Event as IEvent, Inotify, Watcher
log = getLogger(__name__)
_PLUGIN_NAMES = (
"cphulk",
"lfd",
"modsec",
"ossec",
)
_DEFAULT_PATH = Path("/etc/imunify360/rules/disabled-rules")
_WAIT_DIR_TIMEOUT = 10
class _RuleParsingError(Exception):
pass
def _parse_rule(line: str) -> tuple[str, int]:
if ":" not in line:
raise _RuleParsingError("Delimiter ':' is not found in rule:")
fields = line.split(":", maxsplit=2)
if len(fields) != 3:
raise _RuleParsingError(
f"Wrong amount of fields, 3 expected but {len(fields)} found:"
)
plugin_id = fields[0].strip().lower()
if plugin_id not in _PLUGIN_NAMES:
raise _RuleParsingError(f"Unknown plugin ID value '{plugin_id!s}':")
rule_value = fields[1]
try:
rule_id = int(rule_value)
except ValueError as error:
raise _RuleParsingError(
f"Invalid rule ID value '{rule_value!s}':"
) from error
return plugin_id, rule_id
def _load_rules(path: Path) -> dict[str, set[int]]:
if not path.is_file():
log.debug(
"Config '%s' with shared disabled rules is not found.",
path,
)
return {}
result = {}
with path.open(mode="rt") as rules_file:
for line_no, raw_line in enumerate(rules_file, start=1):
if not (line := raw_line.strip()):
continue
try:
plugin_id, rule_id = _parse_rule(line)
except _RuleParsingError as error:
log.warning(
"%s:%d: %s.",
path,
line_no,
str(error),
)
except Exception:
log.exception("%s:%d", path, line_no)
else:
result.setdefault(plugin_id, set()).add(rule_id)
return result
def get_shared_disabled_modsec_rules_ids(*, path: Path = None) -> set[int]:
return _load_rules(path or _DEFAULT_PATH).get("modsec", set())
class DisabledRulesWatcher:
def __init__(
self,
loop: AbstractEventLoop,
*,
path: Path = None,
on_change_cb: Callable[..., None] = None,
):
self.__cb = on_change_cb
self.__event = Event()
self.__path = path or _DEFAULT_PATH
self.__name = self.__path.name.encode("ascii")
self.__rules = {}
self.__watcher = None
self.__task = None
self.__start(loop)
def __start(self, loop: AbstractEventLoop):
if not (dir_path := self.__path.parent).is_dir():
log.error(
"Shared disabled rules directory '%s' is not exist.", dir_path
)
return
self.__rules = _load_rules(self.__path)
self.__watcher = Watcher(loop, coro_callback=self.__on_io_notify)
self.__watcher.watch(
str(dir_path).encode("ascii"),
Inotify.CLOSE_WRITE | Inotify.MOVED_TO | Inotify.DELETE,
)
self.__task = loop.create_task(self.__process_events())
async def __on_io_notify(self, io_event: IEvent):
# Squash many inotify events into one asyncio event.
# It allows to prevent too fast rules reloading.
if io_event.name == self.__name:
self.__event.set()
@recurring_check(0)
async def __process_events(self):
try:
await self.__event.wait()
finally:
self.__event.clear()
self.__rules = _load_rules(self.__path)
if self.__cb is not None:
self.__cb()
def close(self):
if self.__task is not None:
self.__task.cancel()
if self.__watcher is not None:
self.__watcher.close()
def match(self, plugin_id: str, rule_id: int) -> bool:
return rule_id in self.__rules.get(plugin_id, set())
def count(self) -> int:
return sum(map(len, self.__rules.values()))