File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/lists.py
import datetime
import ipaddress
import json
from peewee import DoesNotExist
from defence360agent.rpc_tools.utils import (
generate_warnings,
run_in_executor_decorator,
)
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from im360.api.ips import (
IgnoredByPortAPI,
IPApi,
PortAPI,
GroupIPSyncSender,
IPApiWithIdempotentAdd,
)
from defence360agent.contracts.messages import MessageType
from im360.model.firewall import (
IPList,
Purpose,
)
from im360.subsys.whitelist_rbl import create_rbl_whitelist
from im360.model.firewall import (
BlockedPort,
)
from defence360agent.rpc_tools import ValidationError
class ManageLists(MessageSink, MessageSource):
async def create_sink(self, loop):
self._loop = loop
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
async def _delete_ip_from(self, items, listname):
"""Unblock from *listname*"""
to_ignore, to_unblock = [], []
not_affected = []
for ip in items:
net = ipaddress.ip_network(ip)
supernets = IPList.find_closest_ip_nets(
net, listname=[listname], limit=1
)
# all(a.subnet_of(b) for a, b in zip([net]+supernets, supernets))
if supernets:
if supernets[0].ip_network != net:
# net is a part of supernet(s)
to_ignore.append(net)
else: # the first supernet == net
to_unblock.append(net)
else: # net is not a subnet
not_affected.append(
{
"rec": net,
"listname": IPList.get_field("listname", ip=ip),
}
)
for ip in to_ignore:
# no way to unblock IP, because it a part of blocked supernets so,
# sending ClientUnblock in order to put it in ignore list
await self._sink.process_message(
MessageType.ClientUnblock(
attackers_ip=ip, plugin_id="imunify360"
)
)
affected, not_unblocked = await IPApi.unblock(
to_unblock, listname=listname
)
affected += to_ignore
not_affected += not_unblocked
return affected, not_affected
async def _ip_delete_local(self, items, listname):
group_ip_sender = await GroupIPSyncSender().collect(items)
affected, not_affected = await self._delete_ip_from(items, listname)
if listname == IPList.GRAY:
splash_affected, splash_not_affected = await self._delete_ip_from(
items, IPList.GRAY_SPLASHSCREEN
)
affected += [
addr for addr in splash_affected if addr not in affected
]
na = []
not_affected_lists = (not_affected, splash_not_affected)
for not_affected_list in not_affected_lists:
na += [
net
for net in not_affected_list
if (net["rec"] not in affected) and (net not in na)
]
not_affected = na
await group_ip_sender.filter(affected).send("del")
generate_warnings(
affected,
not_affected,
dest_listname=listname,
all_list=items,
success_warning="{}/{} ip(s) were successfully deleted",
failure_warning="Noop: unable to delete {} from {}",
in_another_list_warning="IP {} is already in {} list",
)
async def _ip_add_idempotent(
self,
listname,
items,
comment=None,
expiration=0,
full_access=None,
scope=None,
comment_autogenerated=False,
):
assert listname in [IPList.BLACK, IPList.WHITE, IPList.GRAY]
if comment is None:
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
auto_generated_comment = "Manually added on %s" % now
comment = auto_generated_comment
comment_autogenerated = True
group_ip_sender = GroupIPSyncSender()
if scope == IPList.SCOPE_LOCAL:
await group_ip_sender.collect(items)
affected, not_affected = await IPApiWithIdempotentAdd.block(
listname=listname,
items=items,
comment=comment,
full_access=full_access,
expiration=expiration,
scope=scope,
# "whitelist ip add" doesn't remove expired manually blacklisted
# subnets
keep_manual_expired_subnets=(listname == IPList.WHITE),
manual=True,
comment_autogenerated=comment_autogenerated,
)
if scope == IPList.SCOPE_LOCAL:
await group_ip_sender.filter(affected).send("del")
await (await GroupIPSyncSender().collect(affected)).send("add")
generate_warnings(
affected,
not_affected,
dest_listname=listname,
all_list=items,
success_warning="{}/{} ip(s) were successfully added",
failure_warning="Noop: unable to add {} to {}",
in_another_list_warning="IP {} is already in {} list",
)
@expect(MessageType.IPListsUpdate)
async def update_local_iplist(self, message):
action = message["action"]
transport = message.get("transport")
response = {}
if transport:
message.pop("transport")
try:
if action == "add":
await self.ip_list_local_add(message)
elif action == "delete":
await self.ip_list_local_delete(message)
except ValidationError as exc:
response = {"error": "ValidationError", "message": str(exc)}
except Exception as exc:
response = {"error": "Exception", "message": str(exc)}
finally:
self._respond_to_transport(transport, json.dumps(response))
def _respond_to_transport(self, transport, message):
"""Writes the provided message to the transport if it exists"""
if transport:
transport.write(message.encode() + b"\n")
async def ip_list_local_add(self, message):
"""replacements for old
whitelist/graylist/blacklist ip list/add/delete/edit,
With changes:
new add will include functionality of old add/edit/move"""
list_name = Purpose.listname(message["purpose"])
_ips = message["items"].pop("items")
await self._ip_add_idempotent(
listname=list_name,
items=[ipaddress.ip_network(_ip) for _ip in _ips],
comment=message["items"].get("comment"),
expiration=message["items"].get("expiration", 0),
full_access=message["items"].get("full_access"),
scope=message["items"].get("scope"),
)
async def ip_list_local_delete(self, message):
"""Used for removing record from IPList table, same as old
rpc calls: `[white/black/gray]list ip delete`, but now also
splachscreen is allowed to delete
"""
list_name = Purpose.listname(message["purpose"])
await self._ip_delete_local(
listname=list_name,
items=[ipaddress.ip_network(_ip) for _ip in message["items"]],
)
@expect(MessageType.UpdateCustomLists)
async def update_custom_lists(self, message):
await create_rbl_whitelist()
@expect(MessageType.BlockedPortUpdate)
async def blocked_port_update(self, message):
transport = message.pop("transport", None)
response = {}
try:
if message.action == "add":
await self._blocked_port_add(
message["items"], message["ips"], message["comment"]
)
elif message.action == "delete":
await self._blocked_port_delete(message["items"])
elif message.action == "edit":
await self._blocked_port_edit(
message["items"], message["comment"]
)
except ValidationError as exc:
response = {"error": "ValidationError", "message": str(exc)}
except Exception as exc:
response = {"error": "Exception", "message": str(exc)}
finally:
self._respond_to_transport(transport, json.dumps(response))
@expect(MessageType.BlockedPortIPUpdate)
async def blocked_port_ip_update(self, message):
transport = message.pop("transport", None)
response = {}
try:
ips = [ipaddress.ip_network(ip) for ip in message["ips"]]
if message.action == "add":
await self._ignored_by_port_add_ip(
message["items"], ips, message["comment"]
)
elif message.action == "delete":
await self._ignored_by_port_delete_ip(message["items"], ips)
elif message.action == "edit":
await self._ignored_by_port_edit_ip(
message["items"], ips, message["comment"]
)
except ValidationError as exc:
response = {"error": "ValidationError", "message": str(exc)}
except Exception as exc:
response = {"error": "Exception", "message": str(exc)}
finally:
self._respond_to_transport(transport, json.dumps(response))
async def _blocked_port_add(self, items, ips, comment=None):
affected, not_affected = await PortAPI.block(items, comment=comment)
for ip in [ipaddress.ip_network(ip) for ip in ips]:
for port, proto in affected:
await IgnoredByPortAPI.block([ip], port=port, proto=proto)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=items,
success_warning="{}/{} port(s) were successfully added",
failure_warning="Noop: unable to add {}",
)
async def _blocked_port_delete(self, items):
affected, not_affected = await PortAPI.unblock(items)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=items,
success_warning="{}/{} port(s) were successfully deleted",
failure_warning="Noop: unable to delete {}",
)
async def _blocked_port_edit(self, items, comment):
affected, not_affected = await PortAPI.edit(items, comment)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=items,
success_warning="{}/{} port(s) were successfully edited",
failure_warning="Noop: unable to edit {}",
)
async def _ignored_by_port_add_ip(self, items, ips, comment=None):
for port, proto in items:
if not await self.check_port_proto(port, proto):
raise ValidationError(
"Port and proto does not exist {}:{}".format(port, proto)
)
affected, not_affected = [], []
for port, proto in items:
_affected, _not_affected = await IgnoredByPortAPI.block(
items=ips, port=port, proto=proto, comment=comment
)
affected.extend(_affected)
not_affected.extend(_not_affected)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=ips,
success_warning="{}/{} ip(s) were successfully added",
failure_warning="Noop: unable to add {}",
)
async def _ignored_by_port_edit_ip(self, items, ips, comment=None):
for port, proto in items:
if not await self.check_port_proto(port, proto):
raise ValidationError(
"Port and proto does not exist {}:{}".format(port, proto)
)
affected, not_affected = [], []
for port, proto in items:
_affected, _not_affected = await IgnoredByPortAPI.edit(
items=ips, port=port, proto=proto, comment=comment
)
affected.extend(_affected)
not_affected.extend(_not_affected)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=ips,
success_warning="{}/{} ip(s) were successfully edited",
failure_warning="Noop: unable to edit {}",
)
async def _ignored_by_port_delete_ip(self, items, ips):
for port, proto in items:
if not await self.check_port_proto(port, proto):
raise ValidationError(
"Port and proto does not exist {}:{}".format(port, proto)
)
affected, not_affected = [], []
for port, proto in items:
_affected, _not_affected = await IgnoredByPortAPI.unblock(
items=ips, port=port, proto=proto
)
affected.extend(_affected)
not_affected.extend(_not_affected)
return generate_warnings(
affected,
not_affected,
dest_listname=None,
all_list=ips,
success_warning="{}/{} ip(s) were successfully deleted",
failure_warning="Noop: unable to delete {}",
)
@run_in_executor_decorator
def check_port_proto(self, port, proto):
try:
BlockedPort.get(port=port, proto=proto)
except DoesNotExist:
return False
return True