File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/config_watcher.py
import logging
import time
from defence360agent.contracts import config
from defence360agent.contracts.messages import MessageType
from defence360agent.plugins.config_watcher import (
ConfigWatcher as BaseConfigWatcher,
POLLING_INTERVAL,
)
from defence360agent.utils import (
await_for,
recurring_check,
retry_on,
Scope,
)
from im360.plugins.sensor.generic import send_to_agent_socket
logger = logging.getLogger(__name__)
async def log_and_sleep(e, i):
logger.warning("Got exception: %s. Attempt: %s", e, i)
await_for(5)
class ConfigWatcher(BaseConfigWatcher):
SCOPE = Scope.IM360
@retry_on(
ConnectionRefusedError,
on_error=log_and_sleep,
max_tries=5,
silent=True,
log=logger,
)
async def _do_rpc(self):
send_to_agent_socket(
command=["config", "update"],
params={"data": "{}"},
wait_for_response=False,
)
@recurring_check(POLLING_INTERVAL)
async def _check_config(self):
if config.any_layer_modified_since(self._last_notify_time):
# notify about the update
message = MessageType.ConfigUpdate(
conf=self._config, timestamp=time.time()
)
await self._do_rpc()
config.Merger.update_merged_config()
await self._sink.process_message(message)
# update the time here, in case ConfigUpdate might stuck
# in the queue for longer than the polling interval
self._last_notify_time = message["timestamp"]