File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/plugins/sensor/cphulk.py
#!/opt/imunify360/venv/bin/python3
import os
import logging
import re
import time
from defence360agent.contracts.plugins import LogStreamReader
from defence360agent.utils import RecurringCheckStop, recurring_check
from defence360agent.contracts.messages import MessageType
logger = logging.getLogger(__name__)
class cpHulkSensor(LogStreamReader):
source_file = "/usr/local/cpanel/logs/cphulkd.log"
PLUGIN_ID = "cphulk"
_PARSE_LOG_RECORD_REGEX = re.compile(
r"Login Blocked: .+"
r"\[Remote IP Address\]=\[(.+?)\].+"
r"\((\d+)/\d+ failures\).+"
)
async def create_sensor(self, loop, sink):
logdir = os.path.dirname(self.source_file)
if os.path.exists(logdir):
return await super().create_sensor(loop, sink)
logger.info(
"%r won't be monitored: %r doesn't exist",
self.source_file,
logdir,
)
@recurring_check(0)
async def _infinite_read_and_proceed(self, stream_reader):
bytes_ = await stream_reader.readline()
if not bytes_:
# eof
raise RecurringCheckStop()
else:
line = bytes_.decode()
match = self._PARSE_LOG_RECORD_REGEX.search(line)
if match:
# Need field "name" for incident validation
incident = MessageType.SensorIncident(
message=match.group(0),
attackers_ip=match.group(1),
retries=int(match.group(2)),
plugin_id=self.PLUGIN_ID,
rule=None,
name="Login Blocked by cpHulk",
timestamp=time.time(),
)
await self._sink.process_message(incident)