File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/resident/captcha_dos_detect.py
import ipaddress
import logging
import time
from collections import defaultdict
from typing import Union
from humanize import naturaldelta
from defence360agent.contracts import plugins
from defence360agent.contracts.messages import MessageType
from defence360agent.model.simplification import run_in_executor
from im360.contracts.config import CaptchaDOS as Config
from im360.model.firewall import IPList
logger = logging.getLogger(__name__)
class CaptchaDosDetect(plugins.MessageSink, plugins.MessageSource):
def __init__(self):
self._events = defaultdict(list)
self._init_config()
async def create_sink(self, loop):
self._loop = loop
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
def _init_config(self):
self._enabled = Config.ENABLED
self._time_frame = Config.TIME_FRAME
self._timeout = Config.TIMEOUT
self._max_count = Config.MAX_COUNT
@plugins.expect(MessageType.ConfigUpdate)
async def update_config(self, _):
self._init_config()
@plugins.expect(
MessageType.CaptchaEvent, event=MessageType.CaptchaEvent.REQUESTED
)
async def process_captcha_event(self, message):
if message.get("is_ajax") or not self._enabled:
return
ip = message["attackers_ip"]
ip_events = self._events[ip]
ip_events.append(message["timestamp"])
t0 = message["timestamp"] - self._time_frame
timeframe_hits = [ts for ts in ip_events if t0 <= ts]
retries = len(timeframe_hits)
if retries > self._max_count:
ttl = self._timeout
expiration = await self.get_expiration(ip, ttl)
captcha_dos_alert = MessageType.CaptchaDosAlert(
attackers_ip=ip,
expiration=expiration,
ttl=ttl,
retries=retries,
timestamp=time.time(),
message=(
"Blacklisted for {} after {} captcha requests"
).format(naturaldelta(expiration - int(time.time())), retries),
)
await self._sink.process_message(captcha_dos_alert)
del self._events[ip]
else:
# make sure to keep only interesting items
self._events[ip] = timeframe_hits
self._cleanup_expired(t0)
def _cleanup_expired(self, ts):
"""
Remove IP addresses that have latest event older then ts
"""
expired_ips = [
ip
for ip, timestamps in self._events.items()
if timestamps[-1] < ts
]
for ip in expired_ips:
del self._events[ip]
async def get_expiration(
self, ip: Union[ipaddress.IPv4Network, ipaddress.IPv6Network], ttl: int
):
"""
Generate new graylist expiration based on calculated dos expiration
:param ip_network ip:
:param int ttl: default time to block for
:return: int expiration: new expiration for ip items in gray list
"""
dos_expiration = int(time.time()) + ttl
subnet = await run_in_executor(
self._loop,
lambda: IPList.find_closest_ip_nets(
ip, listname=[IPList.GRAY], limit=1
),
)
if not subnet:
return dos_expiration
else:
return max(dos_expiration, subnet[0].expiration)