File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/sensor/unix_socket.py
import os
import socket
from contextlib import suppress
from defence360agent.utils import run_with_umask
def create_unix_socket(
path: str, dir_mode: int, sock_mode: int, uid: int = 0, gid: int = 0
):
"""Creates a unix socket at specified path. Returns bound socket object.
Directories in path are automatically created with mode dir_mode.
Socket file mode is set to sock_mode. Socket ownership is set to uid
and gid (defaults to 0 - root)"""
with run_with_umask(0):
os.makedirs(os.path.dirname(path), mode=dir_mode, exist_ok=True)
with suppress(FileNotFoundError):
os.unlink(path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(path)
if uid != 0 or gid != 0:
os.chown(path, uid, gid)
os.chmod(path, sock_mode)
return sock