File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/fix_ip_address.py
from ipaddress import IPv4Network, IPv6Network, ip_network
from logging import getLogger
import aiodns
from defence360agent.contracts.messages import MessageType, Reject
from defence360agent.contracts.plugins import MessageSink, expect
logger = getLogger(__name__)
class FixIPAddress(MessageSink):
PROCESSING_ORDER = MessageSink.ProcessingOrder.PRE_PROCESS_MESSAGE
async def create_sink(self, loop):
self._loop = loop
self._resolver = aiodns.DNSResolver(loop=loop)
async def _validate_attackers_ip(self, message):
if isinstance(message["attackers_ip"], (IPv6Network, IPv4Network)):
return message
try:
net = ip_network(message["attackers_ip"])
except ValueError:
raise Reject("Attackers IP is invalid")
message["attackers_ip"] = net
return message
async def _shrink_ipv6_addr(self, message):
"""Convert ipv6 address 128 bit to ip_network with /64 mask"""
if message["attackers_ip"].version == 6:
new_ip_net = IPv6Network(
(int(message["attackers_ip"].network_address), 64),
strict=False,
)
message["attackers_ip_orig"] = message["attackers_ip"]
message["attackers_ip"] = new_ip_net
return message
async def _process_message_imp(self, message):
if "attackers_ip" not in message:
raise Reject("Got message without attackers_ip")
for coro in (self._validate_attackers_ip, self._shrink_ipv6_addr):
message = await coro(message)
return message
@expect(
MessageType.CaptchaEvent,
MessageType.ClientUnblock,
MessageType.SensorAlert,
)
async def process_one_message(self, message):
return await self._process_message_imp(message)
@expect(MessageType.SensorIncidentList)
async def process_incident_list(self, incident_list):
filtered_list = []
for incident in incident_list.payload:
try:
await self._process_message_imp(incident)
filtered_list.append(incident)
except Reject as error:
logger.info(
"Rejected: %s -> SensorIncident:%s"
% (str(error), incident)
)
if not filtered_list:
raise Reject("SensorIncidentList will be skipped!")
return MessageType.SensorIncidentList(filtered_list)