File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/sensor/ossec.py
import json
import time
from logging import getLogger
from defence360agent.contracts.plugins import LogStreamReader
from defence360agent.utils import RecurringCheckStop, recurring_check
from im360.contracts.config import Webshield
from defence360agent.contracts.messages import MessageType
from im360.internals.strategy import Strategy
logger = getLogger(__name__)
_MIN_SEVERITY = 3
class JsonAlertReader(LogStreamReader):
STRATEGY = Strategy.PRIMARY_IDS_STRATEGY
source_file = "/var/ossec/logs/alerts/alerts.json"
@recurring_check(0)
async def _infinite_read_and_proceed(self, stream_reader):
bytes_ = await stream_reader.readline()
if not bytes_:
raise RecurringCheckStop()
await self._process_line(bytes_.decode(errors="surrogateescape"))
# extract incident from ossec alert
# {
# "rule":{
# "level":5,
# "group":"syslog,sshd,invalid_login,authentication_failed,",
# "comment":"Attempt to login using a non-existent user",
# "sidid":5710
# },
# "srcip":"10.101.1.18",
# "location":"/var/log/secure",
# "full_log":"May 3 08:50:35 i360-test-agent-el7-1 sshd[7597]:
# Failed password for invalid user admin
# from 10.101.1.18 port 45318 ssh2"
# }
def _process_tag(self, comment):
try:
# example of data['rule']['comment']:
# `[MODSEC,noshow] Permission denied error`
before_tag, tail = comment.split("[", 1)
if before_tag:
raise ValueError("Tag should be at head of comment")
event_tags, event_name = tail.split("]", 1)
except (ValueError, IndexError):
return [], comment
event_name = event_name.strip()
event_tags = [tag for t in event_tags.split(",") if (tag := t.strip())]
return event_tags, event_name
async def _process_line(self, line):
try:
data = json.loads(line)
logger.debug("Message from ossec: %s", data)
event_tags, event_name = self._process_tag(data["rule"]["comment"])
inc = MessageType.SensorIncident(
plugin_id="ossec",
rule=data["rule"]["sidid"],
attackers_ip=data.get("srcip", None),
timestamp=time.time(),
message=data["full_log"],
severity=data["rule"]["level"],
name=event_name,
tag=event_tags,
)
if inc["rule"] == Webshield.SPLASH_CAPTCHA_SHOWN_LOG_ENTRY_RULE:
inc = MessageType.CaptchaEvent(inc)
inc["method"] = MessageType.CaptchaEvent.DEFAULT_METHOD
inc["event"] = MessageType.CaptchaEvent.REQUESTED
inc["is_ajax"] = False
await self._sink.process_message(inc)
elif inc.get("severity", 0) < _MIN_SEVERITY:
logger.debug(
"Incident with severity below {} is ignored".format(
_MIN_SEVERITY
)
)
else:
await self._sink.process_message(inc)
except (json.JSONDecodeError, KeyError):
logger.warning("Broken message in ossec json log: {}".format(line))