File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/resident/group_ip_sync.py
import asyncio
from contextlib import suppress
from ipaddress import ip_network
from logging import getLogger
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink, expect
from defence360agent.utils import recurring_check
from im360.api.ips import IPApi
from im360.model.firewall import IPList
logger = getLogger(__name__)
class GroupIPSyncPlugin(MessageSink):
def __init__(self):
self.loop = None
self.message_queue_task = None
self.queue = None
async def create_sink(self, loop):
self.loop = loop
self.queue = asyncio.Queue()
self.message_queue_task = self.loop.create_task(self.process())
async def shutdown(self):
self.message_queue_task.cancel()
with suppress(asyncio.CancelledError):
await self.message_queue_task
@expect(MessageType.GroupIPSyncPush)
async def handle_group_ip_push(self, message):
await self.queue.put(message)
async def _do_process(self, message):
if message["drop_ips"] is True:
logger.info("Received drop_ips=true in GROUP_SYNC")
current = list(
IPList.select().where(IPList.scope == IPList.SCOPE_GROUP),
)
for model in current:
await IPApi.unblock([model.ip_network], model.listname)
logger.info(
"IP %s removed from %s list via GROUP_SYNC.drop_ips",
model.ip,
model.listname,
)
for item in message["add"]:
_, not_affected = await self._do_block(item)
if not_affected:
if not_affected[0]["listname"] is not None:
await IPApi.unblock(
[not_affected[0]["rec"]], not_affected[0]["listname"]
)
logger.info(
"IP %s removed from %s list via GROUP_SYNC.add"
" to be replaced",
item["ip"],
item["list"],
)
await self._do_block(item)
logger.info(
"IP %s added to %s list via GROUP_SYNC.add",
item["ip"],
item["list"],
)
for item in message["del"]:
await IPApi.unblock([ip_network(item["ip"])], item["list"])
logger.info(
"IP %s removed from %s list via GROUP_SYNC.del",
item["ip"],
item["list"],
)
@recurring_check(0)
async def process(self):
while True:
message = await self.queue.get()
try:
await self._do_process(message)
finally:
self.queue.task_done()
@staticmethod
def _do_block(item):
return IPApi.block(
[ip_network(item["ip"])],
listname=item["list"],
comment=item["comment"],
expiration=item["expiration"],
full_access=item["full_access"],
scope=IPList.SCOPE_GROUP,
# item will not be removed via synclist if it is manually added
# it is actually manually added on another server
manual=True,
)