File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys/waf_rules_configurator.py
import json
import logging
import os
from asyncio import CancelledError
from packaging.version import Version
from pathlib import Path
from defence360agent.utils import (
BACKUP_EXTENSION,
CheckRunError,
)
from im360.files import MODSEC, Index
from defence360agent.subsys import web_server
from im360.subsys.panels.base import APACHE
from im360.subsys.panels.generic.panel import GenericPanel
from .modsec_app_version_detector import map_components_versions_to_tags
from .panels.hosting_panel import HostingPanel
from defence360agent.subsys.web_server import (
safe_update_config,
graceful_restart,
)
logger = logging.getLogger(__name__)
COMPONENTS_VERSION_DB = (
"/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3"
)
RULES_CONF_PATTERN = "<IfModule security2_module>\n{}\n</IfModule>"
MAPPING_FILE = "tags_matching.json"
class NotSupportedWebserverError(Exception):
pass
async def is_webserver_supported() -> bool:
"""Apache >= 2.4 is supported and security2_module installed.
- litespeed is not supported
- nginx is not supported
Apache is expected to be running, otherwise False is returned
In case of any error, False is returned also
"""
hp = HostingPanel()
webserver = await hp.get_web_server()
if isinstance(hp, GenericPanel):
# on Generic panel we expect that apache version >= 2.4 will be used
# no check apache running and version explicitly
return webserver == APACHE
if webserver != APACHE:
return False
try:
version = await web_server.apache_version()
modules = await web_server.apache_modules()
except CancelledError:
raise
except Exception as exc:
logger.error("Error occurs while getting Apache version: %s", exc)
return False
return version >= Version("2.4") and b"security2_module" in modules
async def update_waf_rules_config():
"""
Update modsec config file with enabled tags for specific directories.
"""
if not await is_webserver_supported():
raise NotSupportedWebserverError(
"WAF rules configurator supports only apache webserver with "
"version >= 2.4 and ModSecurity 2"
)
config_path = Path(HostingPanel().get_app_specific_waf_config())
new_config = _rules_config()
is_uptodate = (
config_path.exists() and config_path.read_text() == new_config
)
if not is_uptodate:
if await safe_update_config(config_path, new_config):
logger.info("WAF Rules Set Config was successfully updated")
else:
logger.info("WAF Rules Set Config is already up to date")
def _rules_config():
mapping_path = os.path.join(Index.files_path(MODSEC), MAPPING_FILE)
with open(mapping_path, encoding="utf-8") as f:
tags = json.load(f)
rules_list = map_components_versions_to_tags(COMPONENTS_VERSION_DB, tags)
# sort rules list to get the equal hash for the same set of rules
rules_config_text = RULES_CONF_PATTERN.format(
"\n".join(sorted(rules_list))
)
return rules_config_text
async def try_restore_config_from_backup():
"""
In case if Agent starts and config backup is present — then
it is required to restore it: original .conf files must be replaced by
the backup file and then WS restart command must be applied.
In this case we assume that backed up configuration file is correct
and none config checks are performed.
"""
try:
config_path = HostingPanel().get_app_specific_waf_config()
except NotImplementedError:
return
if os.path.isfile(config_path + BACKUP_EXTENSION):
os.rename(config_path + BACKUP_EXTENSION, config_path)
try:
await graceful_restart()
except CheckRunError:
logger.exception(
"Web server failed to start with a backed up config"
)