File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/resident/ignored_rules.py
import logging
import re
from defence360agent.contracts.messages import MessageType, Reject
from defence360agent.contracts.plugins import MessageSink, expect
from im360.model.incident import DisabledRule
from im360.subsys.shared_disabled_rules import DisabledRulesWatcher
logger = logging.getLogger(__name__)
class FilterIgnoredRules(MessageSink):
PROCESSING_ORDER = MessageSink.ProcessingOrder.IGNORE_MESSAGE
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__watcher = None
async def create_sink(self, loop):
self.__watcher = DisabledRulesWatcher(loop)
async def shutdown(self):
if self.__watcher is not None:
self.__watcher.close()
@expect(MessageType.SensorAlert, MessageType.SensorIncident)
async def filter(self, msg):
# filtering third-party rules known to be high FP
try:
if isinstance(msg, MessageType.SensorAlert):
self._reject_non_i360_modsec_rules(msg)
self._filter_user_configured(msg)
self.__filter_distributed_rules(msg)
except KeyError as e:
logger.warning("Not enough fields in %s: %s", msg, e)
def _reject_non_i360_modsec_rules(self, msg):
if msg["plugin_id"] == "modsec" and not is_i360_rule(msg["rule"]):
raise Reject("Non Imunify360 modsec rule is ignored")
def _filter_user_configured(self, msg):
if DisabledRule.is_rule_ignored(
msg["plugin_id"], msg["rule"], msg.get("host", None)
):
raise Reject("Rule ignored by user settings")
def __filter_distributed_rules(self, msg):
if self.__watcher.match(msg["plugin_id"], msg["rule"]):
raise Reject("Rule ignored by shared disabled rules")
def is_i360_rule(rule_id):
"""Whether the *rule_id* belongs to Imunify360 modsec ruleset."""
return re.fullmatch(r"333\d{2}|(?:77|88)\d{6}", rule_id)