File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/sensor/search_bots.py
import ipaddress
import logging
import time
import aiodns
from defence360agent.contracts.messages import MessageType, Reject
from defence360agent.contracts.plugins import MessageSink, expect
from defence360agent.model.simplification import run_in_executor
from im360.api.ips import IPApi
from im360.ioc import services
from im360.model.firewall import IPList, WhitelistedCrawlerDomain
logger = logging.getLogger(__name__)
class WhitelistSearchBots(MessageSink):
"""Protects search bots.
This plugin processes incoming SensorAlert messages. To verify if this
SensorAlert is about a whitelisted search engine, it does following:
* extracts attacker's IP address;
* performs reverse DNS (PTR) lookup on it;
* performs forward DNS (A/ AAAA) lookup on resulting name;
* if one of resulting IP addresses matches attacker's, it is a search bot;
* SensorAlert is rejected (no action taken on it)."""
PROCESSING_ORDER = MessageSink.ProcessingOrder.IGNORE_MESSAGE
WHITELIST_TIMEOUT = 24 * 3600
_FORWARD_QUERY_TYPES = {
ipaddress.IPv4Address: "A",
ipaddress.IPv6Address: "AAAA",
}
_whitelist_cache = services.primary_whitelist_cache
async def create_sink(self, loop):
self._loop = loop
self._resolver = aiodns.DNSResolver(loop=loop)
async def shutdown(self):
if self._resolver is not None:
self._resolver.cancel()
self._resolver = None
async def _whitelist_with_timeout(self, ip, domain):
expiration = int(self.WHITELIST_TIMEOUT + time.time())
if not await self._whitelist_cache.contains(ip):
comment = "Search crawler ({})".format(domain)
await IPApi.block(
[ip], IPList.WHITE, expiration=expiration, comment=comment
)
logger.info(
"Added %s to the Whitelist for %s seconds",
ip,
self.WHITELIST_TIMEOUT,
)
@expect(MessageType.SensorAlert)
async def check_alert(self, message):
ip = message.get("attackers_ip_orig") or message["attackers_ip"]
if int(ip.hostmask):
logger.debug("Can't make dns query for network: %s", message)
return
host = ip.network_address
try:
ptr = await self._resolver.query(host.reverse_pointer, "PTR")
domains = await run_in_executor(
self._loop,
lambda: [
row["domain"]
for row in WhitelistedCrawlerDomain.select(
WhitelistedCrawlerDomain.domain
).dicts()
],
)
if any(ptr.name.endswith(sd) for sd in domains):
a = await self._resolver.query(
ptr.name, self._FORWARD_QUERY_TYPES[type(host)]
)
if any(r.host == str(host) for r in a):
await self._whitelist_with_timeout(ip, ptr.name)
raise Reject("Ignoring alert for search bot: " + ptr.name)
except aiodns.error.DNSError as e:
# common case
logger.debug("Got error checking ip in dns: %s", e)