File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/run.py
#!/opt/imunify360/venv/bin/python3
import asyncio
from logging import getLogger
from peewee import OperationalError
from im360.contracts.config import IPSET_LISTS_PATH
from imav.malwarelib.subsys.malware import (
subscribe_to_malware_action,
HackerTrapHitsSaver,
)
from imav.internals.lazy_load import AVSource
from imav import server
from defence360agent.subsys import systemd_notifier
from defence360agent.api import health
from defence360agent.contracts.config import Model
from defence360agent.contracts.license import LicenseCLN
from defence360agent.contracts.plugins import BasePlugin
from defence360agent.internals.cln import subscribe_to_license_changes
from defence360agent.internals.iaid import IndependentAgentIDAPI
from defence360agent.internals.lazy_load import CoreSource
from defence360agent.model import instance
from imav.run import AV_PLUGINS_PACKAGES
from defence360agent.utils import importer, Scope
from im360 import rpc_handlers
from im360.application.settings import configure
from im360.internals.core import ip_versions
from im360.internals.lazy_load import IM360Source
from im360.subsys import features
IM360_PLUGINS_PACKAGES = ("im360.plugins",)
logger = getLogger(__name__)
def get_plugins() -> set:
"""Return plugins in unspecified order."""
importer.load_packages(
CoreSource.MESSAGES + AVSource.MESSAGES + IM360Source.MESSAGES
)
importer.load_packages(AV_PLUGINS_PACKAGES + IM360_PLUGINS_PACKAGES)
# use lexicographical order (but don't rely on it in code)
return sorted(
[
plugin
for plugin in BasePlugin.get_active_plugins()
if plugin.SCOPE is not Scope.AV
],
key=lambda item: f"{item.__module__}.{item.__name__}",
)
async def update_health_sensor():
if LicenseCLN.is_valid():
health.sensor.registered()
else:
health.sensor.unregistered()
async def init_actions():
# Any uncaught exceptions here prevents the agent from
# starting. Non-critical (agent can continue) functionality should
# either be moved to a plugin or caught&log *all* its exceptions.
# Also nothing should block here indefinitely (any operation that
# may block should have a timeout).
ip_versions.init()
subscribe_to_license_changes(features.update_repos)
subscribe_to_license_changes(features.update_im_email)
subscribe_to_license_changes(update_health_sensor)
instance.db.execute_sql(
"ATTACH '{}' AS proactive".format(Model.PROACTIVE_PATH)
)
max_attempts = 5
# DB is created and migration applied in resident part
# so we need to retry until it's ready
# 5 attempts with 5 seconds sleep between them max 25 seconds
for attempt in range(1, max_attempts + 1):
try:
instance.db.execute_sql(
"ATTACH '{}' AS resident".format(Model.RESIDENT_PATH)
)
instance.db.execute_sql(
"ATTACH '{}' AS ipsetlists".format(IPSET_LISTS_PATH)
)
break
except OperationalError:
if attempt >= max_attempts:
raise
await asyncio.sleep(5)
subscribe_to_malware_action("delete", HackerTrapHitsSaver.add_hit)
subscribe_to_malware_action("cleanup", HackerTrapHitsSaver.add_hit)
subscribe_to_license_changes(IndependentAgentIDAPI.reactivate)
IndependentAgentIDAPI.add_initial_task()
rpc_handlers.init()
def run():
systemd_notifier.notify(systemd_notifier.AgentState.READY)
configure()
plugins = get_plugins()
server.start(plugins, init_actions)
if __name__ == "__main__":
run()
logger.info("agent stopped")