File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/service_manager.py
"""Services manager plugin.
It enables/disables various service based on an imunify360 config change.
"""
import asyncio
import logging
import shutil
from pathlib import Path
from random import randint
from tempfile import NamedTemporaryFile
from defence360agent.contracts import messages, plugins
from defence360agent import utils
from defence360agent.subsys import svcctl
from im360.contracts import config
from im360.simple_rpc.resident_socket import send_to_socket
from im360.subsys import webshield
from im360.subsys.webshield_mode import Mode as WebshieldMode
__all__ = ["ServiceManager"]
logger = logging.getLogger(__name__)
UAL_CRON_TEMPLATE_PATH = Path(
"/opt/imunify360/venv/share/imunify360/imunify360-ual.cron.template"
)
UAL_CRON_PATH = Path("/etc/cron.d/imunify360-ual.cron")
class ServiceManager(plugins.MessageSink):
"""Service manager plugin: stop/start services based on config changes."""
AUDITD_SHOULD_BE_RUNNING = config.FromConfig("LOGGER", "syscall_monitor")
def __init__(self, *, unitctl=None):
self._lock = asyncio.Lock()
self._services = [
self._ensure_consistent_webshield_state,
self._ensure_consistent_dos_protector_state,
self._ensure_consistent_ual_state,
self._ensure_consistend_auditd_state,
self._ensure_consistent_scanlogd_state,
]
self._units = {
"dos_protector": unitctl
or svcctl.imunify360_dos_protector_service(),
"ual": unitctl or svcctl.imunify360_ual_service(),
"auditd": unitctl or svcctl.imunify360_auditd_service(),
"scanlogd": unitctl or svcctl.imunify360_scanlogd_service(),
}
self._configs = {"dos_protector": {}}
async def create_sink(self, loop):
# on startup ConfigUpdate message is sent to all plugins
pass
async def _ensure_consistent_services_state(self):
for service in self._services:
await service()
@plugins.expect(messages.MessageType.ConfigUpdate)
async def on_config_update(
self, message_ignored: messages.MessageType.ConfigUpdate
):
async with self._lock: # handle concurrent config updates
await self._ensure_consistent_services_state()
@utils.log_error_and_ignore()
@utils.retry_on(webshield.Error, max_tries=2)
async def _ensure_consistent_webshield_state(self):
should_be_running = config.Webshield.ENABLE
if (await webshield.is_running()) is not should_be_running:
if should_be_running:
logger.info(
"WebShield is enabled in the config but it is not running."
" Enabling it..."
)
# enable on boot & start it right now
await webshield.service_enable(now=True)
logger.info("Enabled WebShield")
else:
logger.info(
"WebShield is not enabled in the config but it is running."
" Disabling it..."
)
# disable on boot & stop it right now
await webshield.service_disable(now=True)
logger.info("Disabled WebShield")
await send_to_socket(
msg={
"method": "UPDATE_RULES",
"purpose": "webshield state change",
},
wait_for_response=False,
)
if should_be_running:
await webshield.splashscreen_set_state(
config.Webshield.SPLASH_SCREEN
)
await webshield.cpanelprotection_set_state(
config.Webshield.PANEL_PROTECTION
)
mode_supported = await webshield.mode_switch_supported()
if mode_supported and not WebshieldMode.mode_is_correct():
await webshield.set_mode(config.Webshield.MODE)
@utils.log_error_and_ignore()
async def __ensure_service_status(
self, unitctl, service_name, should_be_running, reload=False
):
is_running = await unitctl.is_active()
if is_running is not should_be_running:
if should_be_running:
logger.info(
"%s is enabled in the config but it is not"
" running. Enabling it...",
service_name,
)
# enable on boot & start it right now
await unitctl.enable(now=True)
logger.info("Enabled %s", service_name)
else:
logger.info(
"%s is not enabled in the config but it is"
" running. Disabling it...",
service_name,
)
# disable on boot & stop it right now
await unitctl.disable(now=True)
logger.info("Disabled %s", service_name)
else:
if is_running and reload:
await unitctl.reload()
logger.info(
"Reloading %s after config update...", service_name
) # noqa: E501
async def _ensure_consistent_dos_protector_state(self):
unitctl = self._units["dos_protector"]
if not unitctl:
# unsupported platform
return
old_config = self._configs["dos_protector"]
new_config = config.EnhancedDOS.as_dict()
should_be_running = config.EnhancedDOS.ENABLED
await self.__ensure_service_status(
unitctl,
"DosProtector",
should_be_running,
reload=(old_config != new_config),
)
self._configs["dos_protector"] = new_config
async def _ensure_consistent_ual_state(self):
should_be_running = config.UnifiedAccessLogger.ENABLED
unitctl = self._units["ual"]
if should_be_running:
UAL_CRON_PATH.unlink(missing_ok=True)
await self.__ensure_service_status(
unitctl, "UnifiedAccessLogger", should_be_running, reload=False
)
if not should_be_running:
self._create_ual_cronjob()
def _create_ual_cronjob(self):
cronjob_content = UAL_CRON_TEMPLATE_PATH.read_text().format(
random_minute=randint(0, 59), report_interval="5m"
)
with NamedTemporaryFile("w", delete=False) as f:
temp_cronjob_path = Path(f.name)
f.write(cronjob_content)
temp_cronjob_path.chmod(0o644)
shutil.move(temp_cronjob_path, UAL_CRON_PATH)
async def _ensure_consistent_scanlogd_state(self):
should_be_running = config.Scanlogd.ENABLE
unitctl = self._units["scanlogd"]
await self.__ensure_service_status(
unitctl, "Scanlogd", should_be_running, reload=False
)
async def _ensure_consistend_auditd_state(self):
unitctl = self._units["auditd"]
if not unitctl:
# unsupported platform
return
await self.__ensure_service_status(
unitctl, "AuditD", self.AUDITD_SHOULD_BE_RUNNING, reload=False
)