File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/subsys/features/abstract_feature.py
import glob
import logging
import os
from abc import ABCMeta, abstractmethod
from math import isclose
from typing import List
import psutil
logger = logging.getLogger(__name__)
class FeatureStatus:
ERROR = "error"
INSTALLED = "installed"
INSTALLING = "installing"
REMOVING = "removing"
NOT_INSTALLED = "not_installed"
MANAGED_BY_LVE = "managed_by_lve"
NOT_SUPPORTED_BY_CL_SOLO = "not-supported-by-cl-solo"
def ea4_only(func):
"""
If Easy Apache 4 not installed, then raising an error
:raises FeatureError
:param func: install or remove func
:return func:
"""
async def wrapper(*args, **kwargs):
if not os.path.isfile("/etc/cpanel/ea4/is_ea4"):
raise FeatureError(
"Hardened PHP is compatible only with Easy Apache 4!"
)
return await func(*args, **kwargs)
return wrapper
class FeatureError(Exception):
"""Feature operation can't be performed error"""
pass
class FeatureNotice(FeatureError):
"""Feature operation can't be performed notice"""
pass
class AbstractFeature(metaclass=ABCMeta):
NAME = "AbstractFeature"
INSTALL_LOG_FILE_MASK = None # type: str
REMOVE_LOG_FILE_MASK = None # type: str
_CMD_LIST = [] # type: List[str]
def __init__(self, sink=None):
assert self.INSTALL_LOG_FILE_MASK, "variable isn't set!"
assert self.REMOVE_LOG_FILE_MASK, "variable isn't set!"
self._sink = sink
async def init(self):
self.is_installed = await self.check_installed()
return self
@property
def installation_live_log(self):
return self._get_live_log(self.INSTALL_LOG_FILE_MASK)
@property
def removal_live_log(self):
return self._get_live_log(self.REMOVE_LOG_FILE_MASK)
@classmethod
def _log_still_used(cls, log_file):
"""Checks if any processes are using log file."""
try:
with open(log_file + ".pid") as pf:
pid, creation_time = pf.read().strip().split()
return isclose(
psutil.Process(int(pid)).create_time(),
float.fromhex(creation_time),
rel_tol=1e-12,
)
except (OSError, ValueError, psutil.NoSuchProcess):
return False
@staticmethod
def _ls_logs(log_mask):
"""
:param str log_mask: regexp of log file path
:return: list of files found by log_mask
"""
return glob.glob(log_mask)
@classmethod
def _get_live_log(cls, file_mask):
"""
Returns path of log file, which used by some process.
If log file used by process, assuming that installation/removal
is in the progress
:param str file_mask: regexp of log file path
:return: str path of log, used by some process
"""
return next(filter(cls._log_still_used, cls._ls_logs(file_mask)), None)
async def check_installed(self) -> bool:
if self.installation_live_log:
return False
if self.removal_live_log:
return True
return await self._check_installed_impl()
@abstractmethod
async def _check_installed_impl(self) -> bool:
return False
@abstractmethod
async def install(self) -> str:
"""
:return str: path to log file with installation process
:raise FeatureError: when feature is already installed,
concurrent operation is in progress, feature is not applicable
for given setup, etc.
"""
raise NotImplementedError()
@abstractmethod
async def remove(self) -> str:
raise NotImplementedError()
@staticmethod
def raise_if_shouldnt_install_now(func):
"""
Checks before operation if similar or mutually exclusive operation
is in the progress. Checks if there are condition why operation
can't be performed.
:raises FeatureError: if operation couldn't be performed
:returns str msg: log path if already ongoing operation
:returns continue function isntall/remove: if operation is permitted
"""
async def wrapper(self):
# check if the operation is in progress
if self.removal_live_log:
raise FeatureError("Wait until uninstalling is finished!")
elif self.is_installed:
raise FeatureNotice(
"{} is already installed".format(self.NAME)
)
return self.installation_live_log or await func(self)
return wrapper
@staticmethod
def raise_if_shouldnt_remove_now(func):
"""
:raises FeatureError: if operation couldn't be performed
:returns str msg: log path if already ongoing operation
:returns continue function isntall/remove: if operation is permitted
"""
async def wrapper(self):
# check if the operation is in progress
if self.installation_live_log:
raise FeatureError("Wait until installation is finished!")
elif not self.is_installed:
raise FeatureNotice(
"Can't delete {}, because it's not installed".format(
self.NAME
)
)
return self.removal_live_log or await func(self)
return wrapper
async def status(self):
if self.installation_live_log:
msg = "{} is installing".format(self.NAME)
status = FeatureStatus.INSTALLING
elif self.removal_live_log:
msg = "{} is removing".format(self.NAME)
status = FeatureStatus.REMOVING
elif await self.check_installed():
msg = "{} is installed".format(self.NAME)
status = FeatureStatus.INSTALLED
else:
msg = "{} is not installed".format(self.NAME)
status = FeatureStatus.NOT_INSTALLED
return {
"items": {
"message": msg,
"status": status,
}
}