File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/sensor/generic.py
import socket
import json
from imav.plugins.generic_sensor import (
GenericSensor as GenericSensorBase,
)
from imav.plugins.generic_sensor import Protocol
from defence360agent.contracts.messages import MessageType
from defence360agent.utils import Scope
REQUEST_TIMEOUT = 240
SOCKET_PATHS = {
"root": "/var/run/defence360agent/simple_rpc.sock",
"user": "/var/run/defence360agent/non_root_simple_rpc.sock",
"generic": "/var/run/defence360agent/generic_sensor.sock.2",
}
def send_to_agent_socket(
command: list,
params: dict = None,
socket_path=SOCKET_PATHS["root"],
request_timeout=REQUEST_TIMEOUT,
wait_for_response=False,
):
if params is None:
params = {}
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(request_timeout)
sock.connect(socket_path)
msg = json.dumps({"command": command, "params": params}) + "\n"
sock.sendall(msg.encode())
if not wait_for_response:
return
result = b""
data = True
while data:
sock.settimeout(request_timeout)
data = sock.recv(8192)
result += data
if data.find(b"\n") != -1:
return json.loads(result.decode())
return json.loads(result.decode())
class _Protocol(Protocol):
METHOD2MSGTYPE = {
"ALERT": MessageType.SensorAlert,
"HEALTH": MessageType.Health,
"CAPTCHA": MessageType.CaptchaEvent,
"INCIDENT": MessageType.SensorIncident,
"UNBLOCK": MessageType.ClientUnblock,
"NOOP": MessageType.Noop,
"MALWARE_SCAN": MessageType.MalwareScan,
"MALWARE_SCAN_TASK": MessageType.MalwareScanTask,
"MALWARE_SCAN_COMPLETE": MessageType.MalwareScanComplete,
"MALWARE_CLEAN_COMPLETE": MessageType.MalwareCleanComplete,
"MALWARE_RESTORE_COMPLETE": MessageType.MalwareRestoreComplete,
"MALWARE_CHECK_DETACHED_SCANS": MessageType.CheckDetachedScans,
"SYNCLIST": MessageType.SynclistResponse,
"IP_LISTS_UPDATE": MessageType.IPListsUpdate,
"UPDATE_CUSTOM_LISTS": MessageType.UpdateCustomLists,
"WHITELIST_CACHE_UPDATE": MessageType.WhitelistCacheUpdate,
"IPSET_UPDATE": MessageType.IpsetUpdate,
"FILES_UPDATE": MessageType.FilesUpdated,
"BLOCKED_PORT_UPDATE": MessageType.BlockedPortUpdate,
"BLOCKED_PORT_IP_UPDATE": MessageType.BlockedPortIPUpdate,
"UPDATE_RULES": MessageType.IpsetUpdate,
}
def _parse_msg(self, msg):
data = super()._parse_msg(msg)
if data and (
data.get("method", "").startswith("MALWARE")
or data.get("method") == "NOOP"
):
# WARN: Move message to the agent without copy in the resident part
send_to_agent_socket(["malware", "generic", "event"], params=data)
return None
return data
class GenericSensor(GenericSensorBase):
PROTOCOL_CLASS = _Protocol
SCOPE = Scope.IM360