File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/shared_disabled_rules.py
from asyncio import Event
from logging import getLogger
from defence360agent.contracts.plugins import MessageSource
from defence360agent.model.simplification import run_in_executor
from defence360agent.subsys import web_server
from defence360agent.utils import recurring_check
from im360.model.incident import DisabledRule
from im360.subsys.panels.hosting_panel import HostingPanel
from im360.subsys.shared_disabled_rules import DisabledRulesWatcher
log = getLogger(__name__)
class SharedDisabledRules(MessageSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__loop = None
self.__task = None
self.__event = Event()
self.__watcher = None
self.__panel = HostingPanel()
async def create_source(self, loop, unused_sink):
if self.__task is not None:
return
self.__loop = loop
self.__watcher = DisabledRulesWatcher(
loop, on_change_cb=self.__event.set
)
self.__task = loop.create_task(self.__update_rules())
async def shutdown(self):
if self.__watcher is not None:
self.__watcher.close()
if self.__task is not None:
self.__task.cancel()
@recurring_check(0)
async def __update_rules(self):
try:
await self.__event.wait()
finally:
self.__event.clear()
rules = await run_in_executor(
self.__loop, lambda: DisabledRule.get_global_disabled("modsec")
)
await self.__panel.sync_global_disabled_rules(rules)
await web_server.graceful_restart()
# WARN: This log is for rpm-tests, do not remove it!
log.info(
"%d shared disabled rules loaded.",
self.__watcher.count(),
)