File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/resident/update_files.py
import logging
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink, expect
from im360 import files
from im360.plugins.sensor.generic import send_to_agent_socket
logger = logging.getLogger(__name__)
PHP_IMMUNITY_FILENAME = "/usr/share/i360-php-opts/autorules.yaml"
class UpdateFilesOnServerRequest(MessageSink):
async def create_sink(self, loop):
self._loop = loop
@expect(MessageType.UpdateFiles, list_type=files.PHP_IMMUNITY)
@expect(MessageType.UpdateFiles, list_type=files.PHP_IMMUNITY_V2)
@expect(MessageType.UpdateFiles, list_type=files.PROACTIVE)
@expect(MessageType.UpdateFiles, list_type=files.MODSEC)
@expect(MessageType.UpdateFiles, list_type=files.IP_RECORD)
async def process_update_request_force(self, message):
logger.info(
"%s files will be force updated on server request",
message.list_type,
)
# the error has been logged in files.update already
send_to_agent_socket(
["update"], params={"subj": message.list_type, "force": True}
)