File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/sensor/modsec.py
#!/opt/imunify360/venv/bin/python3
import time
from collections import deque
from logging import getLogger
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import LogStreamReader
from defence360agent.subsys.panels.base import PanelException
from defence360agent.utils import RecurringCheckStop, recurring_check
from defence360agent.utils.common import MINUTE, rate_limit
from im360.subsys import modsec_audit_log
from im360.subsys.panels.hosting_panel import HostingPanel
PLUGIN_ID = "modsec"
logger = getLogger(__name__)
throttled_log_exception = rate_limit(period=5 * MINUTE)(logger.warning)
class ModsecSensor(LogStreamReader):
error_buffer = deque(maxlen=100)
context_buffer = deque(maxlen=10)
async def create_sensor(self, loop, sink):
self.source_file, audit_logdir = self._get_log_paths()
if self.source_file is None:
self._parser = None
elif audit_logdir is None:
self._parser = modsec_audit_log.SerialLogParser()
else:
self._parser = modsec_audit_log.RevolverParser(
audit_logdir_path=audit_logdir
)
await super().create_sensor(loop, sink)
def _get_log_paths(self):
hp = HostingPanel()
result = []
for method in hp.get_audit_log_path, hp.get_audit_logdir_path:
try:
result.append(method())
except PanelException as e:
logger.warning("%s failed: %s", method, e)
result.append(None)
return tuple(result)
@recurring_check(0)
async def _infinite_read_and_proceed(self, stream_reader):
try:
bytes_ = await stream_reader.readline()
# If limit was reached but no full line was consumed
except ValueError as e:
logger.warning(e)
return
if not bytes_:
# eof
if self.error_buffer:
throttled_log_exception(self.formatted_error_buffer())
self.error_buffer.clear()
self.context_buffer.clear()
raise RecurringCheckStop()
self.context_buffer.append(bytes_)
try:
tokens = self._parser.feed(bytes_)
except (
modsec_audit_log.ParseError,
modsec_audit_log.MalformedFileError,
) as e:
self.error_buffer.append((b"".join(self.context_buffer), e))
return
else:
if self.error_buffer: # there were errors previously
throttled_log_exception(self.formatted_error_buffer())
self.error_buffer.clear()
if not tokens:
return
ts = time.time()
for incident in tokens:
incident["timestamp"] = ts
headers = dict(incident.get("advanced", {}).get("headers", []))
incident["domain"] = headers.get("Host") or headers.get("host")
await self._sink.process_message(
MessageType.SensorIncident(incident)
)
def formatted_error_buffer(self):
errors = [
f"{e} in the context of {context}"
for context, e in self.error_buffer
]
return "\n".join(errors)