File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/db_auto_cleanup.py
"""
Automatically cleanups Incidents from db, if they're
contains in db more that config.IncidentLogging.NUM_DAYS
"""
from datetime import timedelta
from logging import getLogger
from humanize import naturaldelta
from defence360agent.contracts.plugins import MessageSink
from defence360agent.model.simplification import (
remove_old_and_truncate,
run_in_executor,
)
from defence360agent.utils import recurring_check
from im360.contracts import config as im360config
from im360.model.firewall import IPList
from im360.model.incident import Incident
logger = getLogger(__name__)
class DbCleanup(MessageSink):
GRAYLIST_EXPIRED_KEEP_NUM_DAYS = 3
GRAYLIST_CLEANUP_EXPIRED_PERIOD = int(timedelta(hours=1).total_seconds())
AUTO_WHITELIST_CHECK_DELAY = int(timedelta(hours=1).total_seconds())
async def create_sink(self, loop):
self._loop = loop
self._taskpool = (
self._loop.create_task(self._recurring_incidents_cleanup()),
self._loop.create_task(self._recurring_whitelist_delete_expired()),
self._loop.create_task(self._recurring_graylist_cleanup()),
)
async def shutdown(self):
for task in self._taskpool:
task.cancel()
await task
@recurring_check(im360config.IncidentLogging.FREQUENCY)
async def _recurring_incidents_cleanup(self):
await self._events_cleanup(
Incident,
im360config.IncidentLogging.NUM_DAYS,
im360config.IncidentLogging.LIMIT,
)
async def _events_cleanup(self, table, num_days, limit):
items_deleted = await run_in_executor(
self._loop,
lambda: remove_old_and_truncate(
table=table, num_days=num_days, max_count=limit
),
)
logger.info(
"Deleted %s records from table '%s' during auto cleanup",
items_deleted,
table._meta.table_name,
)
@recurring_check(AUTO_WHITELIST_CHECK_DELAY)
async def _recurring_whitelist_delete_expired(self):
await self._whitelist_delete_expired()
async def _whitelist_delete_expired(self):
deleted = await run_in_executor(
self._loop, lambda: IPList.delete_expired(IPList.WHITE)
)
logger.info("Deleted %s expired records from whitelist", deleted)
@recurring_check(GRAYLIST_CLEANUP_EXPIRED_PERIOD)
async def _recurring_graylist_cleanup(self):
await self._bglist_cleanup()
async def _bglist_cleanup(self):
items_deleted = await run_in_executor(
self._loop,
lambda: IPList.cleanup_expired_from_bglist(
num_days=self.GRAYLIST_EXPIRED_KEEP_NUM_DAYS
),
)
logger.info(
"Deleted %s expired graylisted/blacklisted IPs "
"(older than %s) during auto cleanup",
items_deleted,
naturaldelta(timedelta(days=self.GRAYLIST_EXPIRED_KEEP_NUM_DAYS)),
)