MOON
Server: Apache
System: Linux server30c.hostingraja.org 3.10.0-962.3.2.lve1.5.63.el7.x86_64 #1 SMP Fri Oct 8 12:03:35 UTC 2021 x86_64
User: jibhires (1887)
PHP: 8.1.30
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open, allow_url_fopen, symlink, escapeshellcmd, pcntl_exec
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/sensor/webshield.py
import logging
import pwd

from im360.contracts.message_pb2 import SendInfo
from defence360agent.contracts.plugins import MessageSource
from defence360agent.contracts.messages import MessageType
from im360.plugins.sensor.generic_protobuf import Protobuf
from im360.plugins.sensor.unix_socket import create_unix_socket

logger = logging.getLogger(__name__)


class _Protocol(Protobuf):
    def process_protobuf(self, parcel):
        if parcel.method != SendInfo.WEBSHIELD:
            raise ValueError("Only WEBSHIELD method is supported")
        captcha_event = MessageType.CaptchaEvent.from_parcel(parcel)
        if captcha_event is not None:
            if captcha_event.event == captcha_event.PASSED:
                unblock = MessageType.ClientUnblock(captcha_event)
                unblock["method"] = MessageType.ClientUnblock.DEFAULT_METHOD
                self._loop.create_task(self._sink.process_message(unblock))
            self._loop.create_task(self._sink.process_message(captcha_event))


class WebshieldSensor(MessageSource):
    SOCKET_PATH = "/var/run/defence360agent/protobuf.sock"
    USER = "imunify360-webshield"

    async def create_source(self, loop, sink):
        self._loop = loop
        self._sink = sink
        user_info = pwd.getpwnam(self.USER)
        sock = create_unix_socket(
            self.SOCKET_PATH, 0o755, 0o770, user_info.pw_uid
        )
        self._server = await self._loop.create_unix_server(
            lambda: _Protocol(self.SOCKET_PATH, self._loop, self._sink),
            sock=sock,
        )