File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/simple_rpc/lists.py
import warnings
import os
from functools import wraps
from peewee import DoesNotExist, fn
from .resident_socket import send_to_socket
from defence360agent.contracts.config import PORT_BLOCKING_MODE_ALLOW
from defence360agent.rpc_tools import lookup
from defence360agent.rpc_tools.utils import (
generate_warnings,
run_in_executor_decorator,
)
from defence360agent.rpc_tools.validate import ValidationError
from defence360agent.utils import Scope, check_disabled_firewall
from im360.api.ips import (
IgnoredByPortAPI,
IPApi,
PortAPI,
GroupIPSyncSender,
IPApiWithIdempotentAdd,
)
from im360.contracts.config import Firewall, Webshield
from im360.model.country import CountryList
from im360.model.firewall import (
BlockedPort,
IPList,
IPListRecord,
IPListPurpose,
Purpose,
)
IPListUpdateTimeout = 60 # seconds
def blocked_ports_allow_mode_only(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if Firewall.port_blocking_mode != PORT_BLOCKING_MODE_ALLOW:
raise PermissionError("Only for FIREWALL.port_blocking_mode=ALLOW")
return await func(*args, **kwargs)
return wrapper
def raise_acquired_validation_error(func):
@wraps(func)
async def wrapper(*args, **kwargs):
response = await func(*args, **kwargs)
if isinstance(response, str):
raise Exception(response)
if not isinstance(response, dict):
return {}
error = response.get("error")
if error == "ValidationError":
raise ValidationError(response.get("message"))
elif error == "Exception":
raise Exception(response.get("message"))
else:
return {}
return wrapper
def _create_graylist_filter(*, except_splash_screen=False, **kwargs):
kwargs["listnames"] = [IPList.GRAY]
if Webshield.SPLASH_SCREEN and not except_splash_screen:
kwargs["listnames"].append(IPList.GRAY_SPLASHSCREEN)
return kwargs
def migrate_warning(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
warnings.warn(
"!! Deprecated cli call, use `ip-list` command instead. !!",
DeprecationWarning,
)
return await func(*args, **kwargs)
return async_wrapper
def warn_disabled_firewall(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if os.path.exists("/var/imunify360/firewall_disabled"):
warnings.warn(
"Firewall disabled: Change would affect the DB but not ipsets",
Warning,
)
return await func(*args, **kwargs)
return wrapper
class ListsEndpoints(lookup.RootEndpoints):
SCOPE = Scope.IM360
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# for internal use only
self._hidden_fields = {IPList.captcha_passed}
@migrate_warning
@lookup.bind("whitelist", "ip", "list")
@run_in_executor_decorator
def whitelist_ip_fetch(
self, limit=None, offset=None, order_by=None, **kwargs
):
counts = self._counts(**kwargs)
return (
counts["white"],
counts,
IPList.fetch(
listnames=[IPList.WHITE],
limit=limit,
offset=offset,
order_by=order_by,
exclude_fields=self._hidden_fields,
**kwargs,
),
)
@migrate_warning
@lookup.bind("blacklist", "ip", "list")
@run_in_executor_decorator
def blacklist_ip_fetch(self, limit=None, offset=None, **kwargs):
return IPList.fetch_count([IPList.BLACK], **kwargs), IPList.fetch(
listnames=[IPList.BLACK],
limit=limit,
offset=offset,
exclude_fields=self._hidden_fields,
**kwargs,
)
@migrate_warning
@lookup.bind("graylist", "ip", "list")
@run_in_executor_decorator
def graylist_fetch(self, limit=None, offset=None, order_by=None, **kwargs):
except_splash_screen = kwargs.pop("no_splash_screen", False)
counts = self._counts(
except_splash_screen=except_splash_screen, **kwargs
)
kwargs = _create_graylist_filter(
except_splash_screen=except_splash_screen, **kwargs
)
return (
counts["gray"],
counts,
IPList.fetch(
limit=limit,
offset=offset,
order_by=order_by,
exclude_fields=self._hidden_fields,
**kwargs,
),
)
@lookup.bind("blacklist")
@run_in_executor_decorator
def blacklist_fetch(
self, limit, offset, manual=None, order_by=None, **kwargs
):
country_items = CountryList.fetch(
order_by=order_by,
by_list=CountryList.BLACK,
**kwargs,
)
ip_items = IPList.fetch(
listnames=[IPList.BLACK],
order_by=order_by,
manual=manual,
exclude_fields=self._hidden_fields,
**kwargs,
)
items = (country_items + ip_items)[offset : offset + limit]
counts = self._counts(manual=manual, **kwargs)
return counts["black"], counts, items
@raise_acquired_validation_error
async def _send_msg_to_socket(self, msg):
return await send_to_socket(
msg=msg,
timeout=IPListUpdateTimeout,
)
async def _send_ip_list_update(self, action, purpose, items):
return await self._send_msg_to_socket(
{
"method": "IP_LISTS_UPDATE",
"action": action,
"purpose": purpose,
# TODO: it would be nice to split
# to separate "items" and "kwargs" keys
"items": items,
}
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("whitelist", "ip", "add")
async def whitelist_add(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.WHITE,
items={
"items": [str(_ip) for _ip in _ips],
"manual": True,
**kwargs,
},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("blacklist", "ip", "add")
async def blacklist_add(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.DROP,
items={
"items": [str(_ip) for _ip in _ips],
"manual": True,
**kwargs,
},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("graylist", "ip", "add")
async def graylist_add(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.CAPTCHA,
items={
"items": [str(_ip) for _ip in _ips],
"manual": True,
**kwargs,
},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("blacklist", "ip", "move")
async def blacklist_move(self, items):
return await self._send_ip_list_update(
action="add",
purpose=Purpose.DROP,
items={"items": [str(_ip) for _ip in items]},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("whitelist", "ip", "move")
async def whitelist_move(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.WHITE,
items={"items": [str(_ip) for _ip in _ips], **kwargs},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("blacklist", "ip", "edit")
async def blacklist_edit(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.DROP,
items={"items": [str(_ip) for _ip in _ips], **kwargs},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("whitelist", "ip", "edit")
async def whitelist_edit(self, **kwargs):
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=Purpose.WHITE,
items={"items": [str(_ip) for _ip in _ips], **kwargs},
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("blacklist", "ip", "delete")
async def blacklist_delete(self, items):
return await self._send_ip_list_update(
action="delete",
purpose=Purpose.DROP,
items=[str(_ip) for _ip in items],
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("whitelist", "ip", "delete")
async def whitelist_delete(self, items):
return await self._send_ip_list_update(
action="delete",
purpose=Purpose.WHITE,
items=[str(_ip) for _ip in items],
)
@migrate_warning
@warn_disabled_firewall
@lookup.bind("graylist", "ip", "delete")
async def graylist_delete(self, items):
return await self._send_ip_list_update(
action="delete",
purpose=Purpose.CAPTCHA,
items=[str(_ip) for _ip in items],
)
@lookup.bind("blocked-port", "list")
@check_disabled_firewall
@blocked_ports_allow_mode_only
@run_in_executor_decorator
def get_port_proto(self, limit=None, offset=None, **kwargs):
counts = self._counts(**kwargs)
return (
counts["blocked-ports"],
counts,
BlockedPort.fetch(limit=limit, offset=offset, **kwargs),
)
@lookup.bind("blocked-port", "add")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def blocked_port_add(self, items, ips=None, comment=None):
ips_list = [str(ip) for ip in ips] if ips else []
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_UPDATE",
"action": "add",
"items": items,
"ips": ips_list,
"comment": comment,
}
)
@lookup.bind("blocked-port", "delete")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def blocked_port_delete(self, items):
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_UPDATE",
"action": "delete",
"items": items,
}
)
@lookup.bind("blocked-port", "edit")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def blocked_port_edit(self, items, comment):
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_UPDATE",
"action": "edit",
"items": items,
"comment": comment,
}
)
@lookup.bind("blocked-port-ip", "add")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def ignored_by_port_add_ip(self, items, ips=None, comment=None):
ips_list = [str(ip) for ip in ips] if ips else []
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_IP_UPDATE",
"action": "add",
"items": items,
"ips": ips_list,
"comment": comment,
}
)
@lookup.bind("blocked-port-ip", "edit")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def ignored_by_port_edit_ip(self, items, ips, comment=None):
ips = [str(ip) for ip in ips]
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_IP_UPDATE",
"action": "edit",
"items": items,
"ips": ips,
"comment": comment,
}
)
@lookup.bind("blocked-port-ip", "delete")
@check_disabled_firewall
@blocked_ports_allow_mode_only
async def ignored_by_port_delete_ip(self, items, ips):
ips = [str(ip) for ip in ips]
return await self._send_msg_to_socket(
{
"method": "BLOCKED_PORT_IP_UPDATE",
"action": "delete",
"items": items,
"ips": ips,
}
)
def _counts(self, manual=None, except_splash_screen=False, **kwargs):
return {
"white": IPList.fetch_count(listnames=[IPList.WHITE], **kwargs),
"black": (
IPList.fetch_count(
listnames=[IPList.BLACK], manual=manual, **kwargs
)
+ CountryList.fetch_count(by_list=CountryList.BLACK, **kwargs)
),
"gray": IPList.fetch_count(
**_create_graylist_filter(
except_splash_screen=except_splash_screen, **kwargs
)
),
"blocked-ports": BlockedPort.fetch_count(**kwargs),
}
@lookup.bind("ip-list", "synced")
@check_disabled_firewall
@run_in_executor_decorator
def ip_list_synced(
self, purpose=None, by_ip=None, limit=None, offset=None
):
return (
IPListRecord.fetch_count(purpose, by_ip),
self._counts_synced(by_ip=by_ip),
IPListRecord.fetch(
purpose=purpose, by_ip=by_ip, limit=limit, offset=offset
),
)
@staticmethod
def _counts_synced(**kwargs):
return {
Purpose.WHITE.value: IPListRecord.fetch_count(
purpose=Purpose.WHITE.value, by_ip=kwargs.get("by_ip")
),
Purpose.DROP.value: IPListRecord.fetch_count(
purpose=Purpose.DROP.value, by_ip=kwargs.get("by_ip")
),
Purpose.CAPTCHA.value: IPListRecord.fetch_count(
purpose=Purpose.CAPTCHA.value, by_ip=kwargs.get("by_ip")
),
Purpose.SPLASHSCREEN.value: IPListRecord.fetch_count(
purpose=Purpose.SPLASHSCREEN.value,
by_ip=kwargs.get("by_ip"),
),
}
async def get_counts_local(
self, list_names, except_splash_screen=False, **kwargs
):
max_count = IPList.fetch_count(listnames=list_names, **kwargs)
count_synced = self._counts_synced(**kwargs)
blacklisted_country_count = CountryList.fetch_count(
by_list=CountryList.BLACK,
by_country_code=kwargs.get("by_country_code"),
by_comment=kwargs.get("by_comment"),
by_ip=kwargs.get("by_ip"),
)
counts = {
"server": {
"white": IPList.fetch_count(
listnames=[IPList.WHITE], **kwargs
),
"drop": (
IPList.fetch_count(listnames=[IPList.BLACK], **kwargs)
+ blacklisted_country_count
),
"captcha": IPList.fetch_count(
**_create_graylist_filter(
except_splash_screen=except_splash_screen,
**kwargs,
)
),
"splashscreen": IPList.fetch_count(
listnames=[IPList.GRAY_SPLASHSCREEN], **kwargs
),
},
"cloud": count_synced,
}
if IPList.BLACK in list_names:
max_count += blacklisted_country_count
return max_count, counts
@staticmethod
async def get_blacklisted_local_countries(
limit=None, offset=None, order_by=None, **kwargs
):
# country can be only black listed
# same as it was before in rpc `blacklist` call
country_items = CountryList.fetch(
by_list=IPList.BLACK,
by_country_code=kwargs.get("by_country_code"),
by_comment=kwargs.get("by_comment"),
by_ip=kwargs.get("by_ip"),
order_by=order_by,
limit=limit,
offset=offset,
)
# map listname to purpose in response
for item in country_items:
item["purpose"] = IPListPurpose.listname2purpose(
item.pop("listname")
).value
return country_items
async def get_ip_local(
self,
list_names,
limit=None,
offset=None,
except_splash_screen=False,
order_by=None,
columns=None,
**kwargs
):
# get ip records
if (
IPList.GRAY in list_names
and Webshield.SPLASH_SCREEN
and not except_splash_screen
):
if IPList.GRAY_SPLASHSCREEN not in list_names:
list_names.append(IPList.GRAY_SPLASHSCREEN)
kwargs = _create_graylist_filter(
except_splash_screen=except_splash_screen, **kwargs
)
kwargs.pop("listnames", None)
fields_to_exclude = (
set(IPList._meta.sorted_fields)
- set(getattr(IPList, n.lower()) for n in columns)
if columns
else set()
)
final_fields_to_exclude = fields_to_exclude.union(self._hidden_fields)
ip_items = IPList.fetch(
listnames=list_names,
exclude_fields=final_fields_to_exclude,
# country record will be added to result
# and offset should be calculated on full set
limit=limit,
offset=offset,
order_by=order_by,
**kwargs,
)
# map listname to purpose in response
if IPList.listname not in fields_to_exclude:
for item in ip_items:
item["purpose"] = IPListPurpose.listname2purpose(
item.pop("listname")
).value
return ip_items
@lookup.bind("ip-list", "local", "list")
@warn_disabled_firewall
async def ip_list_local_list(
self, purpose=None, limit=None, offset=None, order_by=None, **kwargs
):
"""replacements for old whitelist/graylist/blacklist ip list,
With changes: non search by ip, now will find supernets and subnets
"""
if not purpose:
list_names = [
IPList.WHITE,
IPList.BLACK,
IPList.GRAY,
IPList.GRAY_SPLASHSCREEN,
]
else:
list_names = [Purpose.listname(p) for p in purpose]
except_splash_screen = kwargs.pop("no_splash_screen", False)
by_type = kwargs.pop("by_type", None)
if by_type == "country":
max_count, counts = await self.get_counts_local(
# UI interested in blacklisted country only
list_names=[IPList.BLACK],
except_splash_screen=except_splash_screen,
**kwargs,
)
country_items = await self.get_blacklisted_local_countries(
limit, offset, order_by=order_by, **kwargs
)
return max_count, counts, country_items
if by_type == "ip":
max_count, counts = await self.get_counts_local(
list_names=list_names,
except_splash_screen=except_splash_screen,
**kwargs,
)
ip_items = await self.get_ip_local(
list_names,
limit=limit,
offset=offset,
order_by=order_by,
except_splash_screen=except_splash_screen,
**kwargs,
)
return max_count, counts, ip_items
# if there is no filter by record type `by_type`,
# add country records first
country_items = await self.get_blacklisted_local_countries(
limit=None, offset=None, order_by=order_by, **kwargs
)
# in [0 .. limit]
ip_limit = max(0, min(limit, offset + limit - len(country_items)))
# in [0 .. offset]
ip_offset = max(0, offset - len(country_items))
columns = kwargs.pop("columns", None)
if columns is not None:
columns = columns.split(",")
ip_items = await self.get_ip_local(
list_names,
limit=ip_limit,
offset=ip_offset,
order_by=order_by,
except_splash_screen=except_splash_screen,
columns=columns,
**kwargs,
)
result_items = country_items[offset : offset + limit] + ip_items
max_count, counts = await self.get_counts_local(
list_names, except_splash_screen=except_splash_screen, **kwargs
)
return max_count, counts, result_items
@lookup.bind("ip-list", "local", "add")
@warn_disabled_firewall
async def ip_list_local_add(self, purpose, **kwargs):
"""replacements for old
whitelist/graylist/blacklist ip list/add/delete/edit,
With changes:
new add will include functionality of old add/edit/move"""
_ips = kwargs.pop("items")
return await self._send_ip_list_update(
action="add",
purpose=purpose,
items={
"items": [str(_ip) for _ip in _ips],
"manual": True,
**kwargs,
},
)
@lookup.bind("ip-list", "local", "delete")
@warn_disabled_firewall
async def ip_list_local_delete(self, purpose, items):
"""Used for removing record from IPList table, same as old
rpc calls: `[white/black/gray]list ip delete`, but now also
splachscreen is allowed to delete
"""
return await self._send_ip_list_update(
action="delete",
purpose=purpose,
items=[str(_ip) for _ip in items],
)