File: //opt/imunify360/venv/lib/python3.11/site-packages/im360/plugins/resident/cache_clear.py
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import MessageSink, expect
from im360 import files
from im360.ioc import services
class CacheClear(MessageSink):
async def create_sink(self, loop):
pass
@expect(MessageType.FilesUpdated, files_type=files.WHITELISTS)
async def clear_tree_cache(self, _):
services.global_whitelist_tree_cache.reset()
@expect(MessageType.FilesUpdated, files_type=files.GEO)
async def clear_caches_after_countries_update(self, _):
services.global_whitelist_tree_cache.reset()
services.common_whitelist_cache.reset()
services.primary_whitelist_cache.reset()
services.csf_whitelist_cache.reset()
@expect(MessageType.WhitelistCacheUpdate)
async def clear_whitelist_cache(self, _):
services.global_whitelist_tree_cache.reset()
services.common_whitelist_cache.reset()
services.primary_whitelist_cache.reset()
services.csf_whitelist_cache.reset()