File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/sensor/generic_protobuf.py
import asyncio
import logging
from google.protobuf.message import DecodeError
from im360.contracts.message_pb2 import SendInfo
from defence360agent.utils.buffer import SizeBuffer
logger = logging.getLogger(__name__)
class Protobuf(asyncio.Protocol):
def __init__(self, name, loop, sink):
self._buf = SizeBuffer()
self._loop = loop
self._name = name
self._sink = sink
def data_received(self, data):
self._buf.append(data)
for data in self._buf:
parcel = SendInfo()
try:
parcel.ParseFromString(data)
except DecodeError:
logger.debug("Invalid protobuf message: %s", data)
logger.exception(
"Failed to parse protobuf message from %s", self._name
)
else:
logger.debug("Received protobuf message: %s", parcel)
self.process_protobuf(parcel)
def process_protobuf(self, parcel):
pass