File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/resident/aggregate.py
import asyncio
from collections import defaultdict
from logging import getLogger
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from defence360agent.utils import USER_IDENTITY_FIELD, recurring_check
from im360.contracts import config
from defence360agent.contracts.messages import MessageType
from im360.plugins.resident.persistent_storage import PersistentStorage
from im360.subsys.modsec_audit_log import ADVANCED, ENGINE_MODE, STATUS_CODE
logger = getLogger(__name__)
DEFAULT_INCIDENT = "INCIDENT"
LOCAL_INCIDENT = "LOCAL_INCIDENT"
UNREPORTABLE_LOCAL_INCIDENT = "UNREPORTABLE_LOCAL_INCIDENT"
class IncidentAggregateSettings:
RESULT_MESSAGE = MessageType.SensorIncidentList
_AGGREGATE_BY = (
"attackers_ip",
"name",
"plugin_id",
"rule",
"severity",
"domain",
)
_FIELDS_TO_SAVE = (
"message",
"timestamp",
# ModSecurity optional fields
"inbound_anomality_score",
"outbound_anomality_score",
"vendor",
"modsec_version",
"tag",
"access_denied",
USER_IDENTITY_FIELD,
ADVANCED,
STATUS_CODE,
ENGINE_MODE,
"transaction_id",
)
@classmethod
def extract_key(cls, msg):
return tuple(msg.get(k, None) for k in cls._AGGREGATE_BY)
@classmethod
def extract_fields(cls, msg):
fields = {field: msg.get(field) for field in cls._AGGREGATE_BY}
fields.update(
{
field: value
for field, value in msg.items()
if field in cls._FIELDS_TO_SAVE
}
)
return fields
class LocalIncidentAggregateSettings(IncidentAggregateSettings):
RESULT_MESSAGE = MessageType.LocalIncidentList
class UnreportableLocalIncidentAggregateSettings(IncidentAggregateSettings):
RESULT_MESSAGE = MessageType.UnreportableLocalIncidentList
AGGREGATE_SETTINGS = {
DEFAULT_INCIDENT: IncidentAggregateSettings,
LOCAL_INCIDENT: LocalIncidentAggregateSettings,
UNREPORTABLE_LOCAL_INCIDENT: UnreportableLocalIncidentAggregateSettings,
}
class Aggregate(MessageSink, MessageSource):
# We intentionally derive from MessageSink for @expect magic to work
# send aggregated incident list not often that
# specified timeout (in seconds)
AGGREGATE_TIMEOUT = 60
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._timer = None
async def create_sink(self, loop):
self._loop = loop
self._msg_buf = {}
async def create_source(self, loop, sink):
self._sink = sink
self._timer = self._loop.create_task(self._send_by_timer())
async def shutdown(self):
if self._timer is not None:
self._timer, t = None, self._timer
t.cancel()
await t
@expect(MessageType.SensorIncident)
async def accumulate(self, message):
if (
isinstance(message, MessageType.SensorIncident)
and message.get("attackers_ip") is None
):
# Check if it's incident from sensor
# and attackers_ip not provided or None
# LOCAL_INCIDENT method for that case
# if the local incident is not severe enough, it is not reported
severe = (
message.get("severity", 0)
>= config.LocalIncidentReporting.MIN_SEVERITY
)
# noshow make incident reportable
method = (
LOCAL_INCIDENT
if severe
or any(
tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE
for tag in message.get("tag") or []
)
else UNREPORTABLE_LOCAL_INCIDENT
)
if method == UNREPORTABLE_LOCAL_INCIDENT:
assert not any(
tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE
for tag in message.get("tag") or []
)
self._msg_buf.setdefault(method, []).append(message)
else:
self._msg_buf.setdefault(message["method"], []).append(message)
@recurring_check(AGGREGATE_TIMEOUT)
async def _send_by_timer(self):
tasks = []
for msg_type, msgs in self._msg_buf.items():
if msgs:
list_msg = AGGREGATE_SETTINGS[msg_type].RESULT_MESSAGE
message = list_msg(self._aggregate(msg_type, msgs))
self._msg_buf[msg_type].clear()
tasks.append(self._sink.process_message(message))
if len(tasks):
await asyncio.gather(*tasks)
def _aggregate(self, msg_type, msgs):
aggregate_dict = defaultdict(dict)
AggregateSettings = AGGREGATE_SETTINGS[msg_type]
for msg in msgs:
key = AggregateSettings.extract_key(msg)
aggregated = aggregate_dict[key]
aggregated.setdefault("retries", 0)
aggregated["retries"] += msg.get("retries", 1)
aggregated.update(AggregateSettings.extract_fields(msg))
return list(aggregate_dict.values())