File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/mod_sec_blocker.py
"""Generate alert on modsec incident if it triggers limits."""
import logging
import time
from collections import deque
from typing import Any, Optional
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from im360.contracts.config import (
ModsecBlockByCustomRules as CustomRulesConfig,
)
from im360.contracts.config import ModsecBlockBySeverity as SeverityConfig
from im360.contracts.config import ModsecSensor
from defence360agent.contracts.messages import MessageType
logger = logging.getLogger(__name__)
class ModSecBlockBySeverity(MessageSink, MessageSource):
async def create_sink(self, loop):
self._loop = loop
self.events = {}
async def create_source(self, loop, sink):
self._sink = sink
def _is_over_limit(self, keys: Any, limit: int, period: int) -> bool:
"""Returns True if rate of events grouped by `key` is higher than
`limit` for last `period` seconds, False otherwise."""
if keys not in self.events:
self.events[keys] = deque(maxlen=limit)
q = self.events[keys]
q.append(time.monotonic())
# Dictionary, where keys are something and values are
# double-ended queues which contain a time of occurred events.
if len(q) == limit:
interval = q[-1] - q[0]
# Queue always contains one less than limit number
# of event time and create an alert when the difference between
# the time of the new event and the first event in a queue
# is less than said `period`
if interval <= period:
del self.events[keys]
return True
else:
# Remove the first item in a queue otherwise
q.popleft()
return False
@expect(MessageType.SensorIncident, plugin_id=ModsecSensor.PLUGIN_ID)
async def check_incident(self, message):
reason = self._process_incident(message)
if reason:
logger.info(
"Creating alert from %s with reason: %s", message, reason
)
await self._sink.process_message(
MessageType.SensorAlert.from_incident(message)
)
def _process_incident(
self, message: MessageType.SensorIncident
) -> Optional[str]:
"""Process an incident, update limits and return a string describing a
reason to create alert from message.
If no alert should be created returns None."""
ip = message.get("attackers_ip")
rule = str(message.get("rule"))
severity = message.get("severity")
access_denied = message.get("access_denied")
if access_denied and SeverityConfig.ENABLED:
if self._is_over_limit(
(ip, access_denied),
SeverityConfig.DENIED_NUM_LIMIT,
SeverityConfig.CHECK_PERIOD,
):
return "multiple access denied triggered"
elif (
CustomRulesConfig.RULES and rule in CustomRulesConfig.RULES.keys()
):
if self._is_over_limit(
(ip, rule),
CustomRulesConfig.get_limit(rule),
CustomRulesConfig.get_timeout(rule),
):
return "custom rule {} triggered".format(rule)
elif (
SeverityConfig.ENABLED
and severity
and severity <= SeverityConfig.SEVERITY_LIMIT
):
if self._is_over_limit(
(ip, rule),
SeverityConfig.MAX_REPETITION,
SeverityConfig.CHECK_PERIOD,
):
return "severity rule {} triggered".format(rule)