File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/pam.py
"""PAM module management plugin.
Exports manually whitelisted IP addresses to PAM whitelist;
"""
import asyncio
import contextlib
import logging
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink, expect
from defence360agent.utils import recurring_check
from im360.model import custom_lists
from im360.model.firewall import IPList
from im360.subsys import ossec, pam
logger = logging.getLogger(__name__)
class PAM(MessageSink):
_EXPORT_DELAY = 10 # seconds
def __init__(self):
self._tasks = []
self._whitelist_update_required = asyncio.Event()
self._status_check_required = asyncio.Event()
self._loop = None
async def create_sink(self, loop) -> None:
self._loop = loop
self._tasks.append(loop.create_task(self._exporter()))
IPList.Signals.added.connect(self._on_signal, IPList.WHITE)
IPList.Signals.deleted.connect(self._on_signal, IPList.WHITE)
IPList.Signals.cleared.connect(self._on_signal, IPList.WHITE)
IPList.Signals.updated.connect(self._on_signal, IPList.WHITE)
self._whitelist_update_required.set()
def _on_signal(self, listname: str, **kwargs) -> None:
self._whitelist_update_required.set()
@recurring_check(_EXPORT_DELAY)
async def _exporter(self) -> None:
await self._whitelist_update_required.wait()
self._whitelist_update_required.clear()
status = await pam.get_status()
if all(
s == pam.PamServiceStatusValue.disabled for s in status.values()
):
# pam protection is disabled for all services
return # nothing to do
q = (
IPList.select(IPList.ip)
.where(IPList.listname == IPList.WHITE, IPList.manual)
.tuples()
)
networks = [ip for [ip] in q]
networks.extend(await custom_lists.CustomWhitelist.load())
try:
await pam.export_ip_whitelist(networks)
except FileNotFoundError as exc:
logger.warning("Failed to export IP whitelist for PAM: %s", exc)
async def shutdown(self) -> None:
IPList.Signals.added.disconnect(self._on_signal)
IPList.Signals.deleted.disconnect(self._on_signal)
IPList.Signals.cleared.disconnect(self._on_signal)
IPList.Signals.updated.disconnect(self._on_signal)
for task in self._tasks:
if task is not None:
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
@expect(MessageType.UpdateCustomLists)
async def on_custom_lists_update(
self, message: MessageType.UpdateCustomLists
):
self._whitelist_update_required.set()