File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/sensor/dos_detector.py
import collections
import datetime
import ipaddress
import logging
from typing import Iterable, Tuple
import psutil
from defence360agent.contracts import plugins
from defence360agent.utils import recurring_check
from im360.contracts.config import DOS as DOS_config
from defence360agent.contracts.messages import MessageType
from im360.contracts.plugins import IDSAwareMessageSink
from im360.internals import strategy
from im360.utils import net
logger = logging.getLogger(__name__)
class DOSSensor(plugins.Sensor, IDSAwareMessageSink):
STRATEGY = strategy.Strategy.PRIMARY_IDS_STRATEGY
async def create_sensor(self, loop, sink):
self._loop = loop
self._sink = sink
async def create_sink(self, loop):
self._loop = loop
self._task = None
async def activate(self):
coro = recurring_check(DOS_config.INTERVAL)(self._check_connections)
self._task = self._loop.create_task(coro())
await super().activate()
async def deactivate(self):
await self._cond_task_cancel()
await super().deactivate()
async def _cond_task_cancel(self):
if self._task is not None:
self._task.cancel()
await self._task
self._task = None
@staticmethod
def list_dos_ips() -> Iterable[Tuple[str, int]]:
"""
Provides list of IPs that have more than allowed simultaneous
connections to the server.
:return: iterator over tuples (IP, connections)
"""
def non_local_established_connections():
local_ips = set(net.local_ip_addresses())
for conn in psutil.net_connections():
if conn.status == psutil.CONN_ESTABLISHED:
ip_addr = ipaddress.ip_address(conn.raddr.ip)
if getattr(ip_addr, "ipv4_mapped", None):
# IPv4-mapped IPv6 addresses
ip_addr = ip_addr.ipv4_mapped
if ip_addr not in local_ips:
yield str(ip_addr), conn.laddr.port
connections = collections.Counter(non_local_established_connections())
for (remote_ip, port), count in connections.items():
per_port = DOS_config.PER_PORT.get(
str(port), DOS_config.DEFAULT_LIMIT
)
if count > per_port:
logger.debug(
"DOS was discovered from ip %s on port %d"
" with %d connections",
remote_ip,
port,
count,
)
yield remote_ip, port, count
async def shutdown(self):
await self._cond_task_cancel()
async def _check_connections(self):
if not DOS_config.ENABLED:
return
logger.debug("Checking for a DoS connections")
for ip, port, num_connections in self.list_dos_ips():
message = self.generate_message(ip, port, num_connections)
for msg_to_sink in (
MessageType.SensorAlert,
MessageType.SensorIncident,
):
self._loop.create_task(
self._sink.process_message(msg_to_sink(message))
)
@staticmethod
def generate_message(ip, port, num_connections):
today = datetime.datetime.now()
return {
"plugin_id": "cl_dos",
"rule": None,
"timestamp": today.timestamp(),
"attackers_ip": ip,
"connections": num_connections,
"name": "DOS detection",
"port": port,
"message": (
"{} Denial of Service attack was discovered "
"from {}, on port {}. Open connections: {}".format(
today.strftime("%b %-d %H:%M:%S"),
ip,
port,
num_connections,
)
),
}