File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/simple_rpc/resident_socket.py
import socket
import json
from defence360agent.contracts.config import (
GENERIC_SENSOR_SOCKET_PATH,
)
async def send_to_socket(msg, timeout=15, wait_for_response=True):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = b""
data = True
try:
sock.connect(GENERIC_SENSOR_SOCKET_PATH)
sock.sendall(json.dumps(msg).encode() + b"\n")
if not wait_for_response:
return {}
while data:
sock.settimeout(timeout)
data = sock.recv(8192)
result += data
if data.find(b"\n") != -1:
return json.loads(result.decode())
return json.loads(result.decode())
except (
ConnectionRefusedError,
FileNotFoundError,
):
return "Failed to send to socket, check your socket active"
except socket.timeout:
return (
result.decode()
if result
else "Failed to send to socket in time"
)
except json.JSONDecodeError:
return "Failed to decode json answer from a plugin"