File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/ttl_graylist.py
"""
Append timeout to graylist ip
"""
import time
from datetime import timedelta
from logging import getLogger
from peewee import DoesNotExist
from defence360agent.contracts.plugins import expect, MessageSink
from defence360agent.model.simplification import run_in_executor
from defence360agent.contracts.messages import MessageType
from im360.internals.core.ipset.ip import IPSetGray
from im360.model.firewall import IPList
logger = getLogger(__name__)
class GraylistTimeout(MessageSink):
PROCESSING_ORDER = MessageSink.ProcessingOrder.GRAYLIST_TIMEOUT
_TIMEOUTS = (
timedelta(minutes=5),
timedelta(minutes=30),
timedelta(hours=3),
timedelta(hours=12),
timedelta(days=3),
timedelta(days=15),
timedelta(
days=timedelta(seconds=IPSetGray.GRAYLIST_DEFAULT_TIMEOUT).days
),
# 24 days
)
async def create_sink(self, loop):
self._loop = loop
@expect(MessageType.SensorAlert)
async def append_timeout(self, message):
try:
deep = await run_in_executor(
self._loop,
lambda: IPList.get(
ip=message["attackers_ip"], listname=IPList.GRAY
).deep,
)
except DoesNotExist:
deep = None
message["properties"] = self.next_timeout(deep)
return message
def next_timeout(self, deep=None):
"""
Calculate next timeout
:param deep: previous deep - block level
:return:
"""
if deep is None:
deep = 0
else:
deep = min(deep + 1, len(self._TIMEOUTS) - 1)
ttl = int(self._TIMEOUTS[deep].total_seconds())
return {
# TTL for debug
"ttl": ttl,
# All modules should be use expiration time
"expiration": int(time.time() + ttl),
# Blocking level
"deep": deep,
}