MOON
Server: Apache
System: Linux server30c.hostingraja.org 3.10.0-962.3.2.lve1.5.63.el7.x86_64 #1 SMP Fri Oct 8 12:03:35 UTC 2021 x86_64
User: jibhires (1887)
PHP: 8.1.30
Disabled: show_source, system, shell_exec, passthru, exec, popen, proc_open, allow_url_fopen, symlink, escapeshellcmd, pcntl_exec
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/im360/api/server/remote_iplist.py
import asyncio
import logging
import urllib
import json
import gzip
from http.client import RemoteDisconnected
from typing import Iterable

from defence360agent.api.server import API, APIError

logger = logging.getLogger(__name__)


class RemoteIPListAPI(API):
    URL = "/api/sync/v1/iplist"

    @classmethod
    async def sync(
        cls,
        server_id,
        purposes,
        sync_point,
        checksum,
        cln_license,
        *,
        lazy_responses=True
    ) -> Iterable[dict]:
        loop = asyncio.get_event_loop()
        try:
            request = urllib.request.Request(
                cls._BASE_URL + cls.URL,
                method="POST",
                data=json.dumps(
                    {
                        "sync_point": sync_point,
                        "server_id": server_id,
                        "purposes": purposes,
                        "checksum": checksum,
                        "license": cln_license,
                    }
                ).encode(),
                headers={"Content-Type": "application/json"},
            )
            raw_response = await loop.run_in_executor(
                None,
                cls.request,
                request,
                False,
            )
            # urllib does not support chunked read of chunked response
            if lazy_responses:
                return cls._lazy_chunks_iterator(raw_response)
            else:
                result = []
                for chunk in gzip.decompress(raw_response).split(b"\n"):
                    if chunk:
                        result.append(json.loads(chunk))
                return result
        except (
            asyncio.TimeoutError,
            RemoteDisconnected,
            ValueError,
            APIError,
        ) as e:
            logger.error("Failed to get iplist update from server: %s", e)
            return []

    @classmethod
    def _lazy_chunks_iterator(cls, raw_response):
        for chunk in gzip.decompress(raw_response).split(b"\n"):
            if chunk:
                yield json.loads(chunk)