File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/persistent_storage.py
"""Update agent's db on receiving message & trigger other changes if necessary.
"""
import logging
from defence360agent.contracts.messages import Reject
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from defence360agent.model.simplification import run_in_executor
from defence360agent.contracts.messages import MessageType
from im360.internals import geo
from im360.model.firewall import IPList, is_expired
from im360.model.incident import Incident
from im360.utils.validate import IP
logger = logging.getLogger(__name__)
class PersistentStorage(MessageSink, MessageSource):
IGNORED_INCIDENT_TAGS_TO_SAVE = ("noshow",)
async def create_sink(self, loop):
self._loop = loop
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
@staticmethod
def _filter_out(incidents):
"""
yield incidents that should be stored
into local db
"""
for inc in incidents:
# filter out incidents with 'noshow' tag
if any(
tag in PersistentStorage.IGNORED_INCIDENT_TAGS_TO_SAVE
for tag in inc.get("tag", [])
):
continue
# filter out incidents from whitelisted IP addresses
if inc.get("ip_whitelisted", False):
continue
yield inc
@expect(MessageType.SensorIncidentList)
@expect(MessageType.LocalIncidentList)
@expect(MessageType.UnreportableLocalIncidentList)
async def record_incident(self, message):
logger.debug(
'record_incident: "%s"',
message.get("description", "<no decription>"),
)
incident_list = []
is_local = isinstance(
message,
(
MessageType.UnreportableLocalIncidentList,
MessageType.LocalIncidentList,
),
)
with geo.reader() as geo_reader:
for incident in self._filter_out(message.list):
if is_local:
ip = None
country_code = None
else:
ip = IP.ip_net_to_string(incident.get("attackers_ip"))
country_code = geo_reader.get_id(ip)
incident_list.append(
{
"plugin": incident["plugin_id"],
"rule": incident["rule"],
"timestamp": incident["timestamp"],
"retries": incident["retries"],
"severity": incident["severity"],
"name": incident["name"],
"description": incident["message"],
"abuser": ip,
"country": country_code,
"domain": incident.get("domain"),
}
)
await run_in_executor(
self._loop, lambda: Incident.save_incident_list(incident_list)
)
@expect(MessageType.CaptchaDosAlert)
async def record_captcha_dos_alert(self, message):
"""Update iplists db tables on CaptchaDosAlert.
& Send BlockUnblockList, to update ipsets/webshield.
"""
if is_expired(message["expiration"]):
raise Reject("Expired CaptchaDos alert")
lists_or_error = await run_in_executor(
self._loop,
lambda: IPList.blacklist_graylisted_on_captcha_dos_alert(
ip=message["attackers_ip"],
expiration=message["expiration"],
comment=message["message"],
),
)
if isinstance(lists_or_error, Exception):
raise lists_or_error
await self._sink.process_message(
MessageType.BlockUnblockList(lists_or_error)
)