File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/sensor/webshield.py
import logging
import pwd
from im360.contracts.message_pb2 import SendInfo
from defence360agent.contracts.plugins import MessageSource
from defence360agent.contracts.messages import MessageType
from im360.plugins.sensor.generic_protobuf import Protobuf
from im360.plugins.sensor.unix_socket import create_unix_socket
logger = logging.getLogger(__name__)
class _Protocol(Protobuf):
def process_protobuf(self, parcel):
if parcel.method != SendInfo.WEBSHIELD:
raise ValueError("Only WEBSHIELD method is supported")
captcha_event = MessageType.CaptchaEvent.from_parcel(parcel)
if captcha_event is not None:
if captcha_event.event == captcha_event.PASSED:
unblock = MessageType.ClientUnblock(captcha_event)
unblock["method"] = MessageType.ClientUnblock.DEFAULT_METHOD
self._loop.create_task(self._sink.process_message(unblock))
self._loop.create_task(self._sink.process_message(captcha_event))
class WebshieldSensor(MessageSource):
SOCKET_PATH = "/var/run/defence360agent/protobuf.sock"
USER = "imunify360-webshield"
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
user_info = pwd.getpwnam(self.USER)
sock = create_unix_socket(
self.SOCKET_PATH, 0o755, 0o770, user_info.pw_uid
)
self._server = await self._loop.create_unix_server(
lambda: _Protocol(self.SOCKET_PATH, self._loop, self._sink),
sock=sock,
)