import httpx import asyncio from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses from .plugins import use_plugins from typing import Optional, NamedTuple class NeededAddrs(NamedTuple): ipv4: bool ipv6: bool @classmethod def from_config(cls, config: dict) -> "NeededAddrs": need_ipv4 = config.get("require_ipv4", False) need_ipv6 = config.get("require_ipv6", False) if "require_ipv4" not in config and "require_ipv6" not in config: need_ipv4 = need_ipv6 = True return cls(need_ipv4, need_ipv6) def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool: """Проверка валидности IP адресов Args: addrs (IPAddreses): IP адреса - результат одного из источников config (dict): общая конфигурация Returns: bool: валиден или нет """ result = addrs.ipv4 or addrs.ipv6 if config.get("require_ipv4"): result = result and addrs.ipv4 if config.get("require_ipv6"): result = result and addrs.ipv6 return bool(result) def is_ipv4_ok(ipv4: str, config: dict) -> bool: """Проверка IPv4 адреса, подходит или нет Args: ipv4 (str): IPv4 адрес config (dict): общая конфигурация Returns: bool: подходит или нет """ return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True def is_ipv6_ok(ipv6: str, config: dict) -> bool: """Проверка IPv6 адреса, подходит или нет Args: ipv6 (str): IPv6 адрес config (dict): общая конфигурация Returns: bool: подходит или нет """ return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True async def get_ip_addresses(config: dict) -> Optional[IPAddreses]: """Получение всех IP адресов из всех источников Args: config (dict): общая конфигурация Returns: Optional[IPAddreses]: результат получения, либо None """ unit_mode = config.get("unit_mode", False) if unit_mode: result = await get_ip_addresses_unit(config) else: result = await get_ip_addresses_any(config) return result async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]: """Получение всех IP адресов из всех источников (режим any) Получает разные адреса от любых источников Args: config (dict): общая конфигурация Returns: Optional[IPAddreses]: результат получения, либо None """ need = NeededAddrs.from_config(config) providers = BaseSourceProvider.registred.values() ip_addresses, is_done = None, False last_src, last_ipv4, last_ipv6 = "", "", "" ok_ipv4, ok_ipv6 = False, False drop = [] pending = [] if need.ipv4: pending += [ asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers ] if need.ipv6: pending += [ asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers ] while not is_done and pending: done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) for x in done: ip_addr = x.result() if x.get_name().endswith(".v6"): last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr else: last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr if config.get("debug"): print("debug:", "get", x.get_name(), ip_addr) last_src = x.get_name().rsplit(".", maxsplit=1)[0] ok_ipv4 = is_ipv4_ok(last_ipv4, config) ok_ipv6 = is_ipv6_ok(last_ipv6, config) pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)] pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)] if ok_ipv4 and pending_v4: drop += pending_v4 pending = pending_v6 for i in pending_v4: i.cancel() pending_v4 = [] if ok_ipv6 and pending_v6: drop += pending_v6 pending = pending_v4 for i in pending_v6: i.cancel() pending_v6 = [] if not (ok_ipv4 and ok_ipv6): continue ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) if is_valid_addreses(ip_addresses, config): is_done = True break ip_addresses = None if not (ok_ipv4 and ok_ipv6): ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) drop = drop + list(pending) if drop: gather = asyncio.gather(*drop) gather.cancel() try: await gather except asyncio.CancelledError: pass return ip_addresses async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]: """Получение всех IP адресов из всех источников (режим unit) Получает адреса только от одного источника Args: config (dict): общая конфигурация Returns: Optional[IPAddreses]: результат получения, либо None """ providers = BaseSourceProvider.registred.values() ip_addresses = None is_done = False pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers] while not is_done and pending: done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) for x in done: ip_addresses = x.result() if is_valid_addreses(ip_addresses, config): is_done = True break ip_addresses = None if pending: gather = asyncio.gather(*pending) gather.cancel() try: await gather except asyncio.CancelledError: pass return ip_addresses async def check_ip_addresses(ip_addresses: IPAddreses) -> bool: """Проверка результата получения IP адресов с помощью фильтров Args: ip_addresses (IPAddreses): IP адреса Returns: bool: корректны ли адреса (изменились ли они), надо ли продолжать обработку и отправлять их на сервер """ providers = BaseFilterProvider.registred.values() result = True failed = "" pending = [ asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers ] while result and pending: done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) for x in done: result = x.result() if not result: failed = x.get_name() if pending: gather = asyncio.gather(*pending) gather.cancel() try: await gather except asyncio.CancelledError: pass if not result: print("failed filter:", failed) return result async def send_ip_addreses(ip_addresses: IPAddreses): """Отправка адресов на все плагины вывода Args: ip_addresses (IPAddreses): IP адреса """ providers = BaseOutputProvider.registred.values() await asyncio.gather( *( asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name) for p in providers ) ) def print_debug_info(config: dict): """Вывод всех зарегистрированных плагинов и другой отладочной информации""" def format_registred(name, base): result = f"{name}:\n" result += f" classes: {[*base._childs]}\n" result += f" .values: {[*map(str, base._childs.values())]}\n" result += f" providers: {[*base.registred]}\n" result += f" .values: {[*map(str, base.registred.values())]}\n" return result if config.get("debug", False): print("DEBUG ->") bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider for info in map(format_registred, "sources filters outputs".split(), bases): print(info) print("DEBUG <-") async def app( config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport ): """Запуск приложения Args: config (dict): общая конфигурация ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4 ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6 """ use_plugins(config, ipv4t, ipv6t) print_debug_info(config) ip_addreses = await get_ip_addresses(config) if ip_addreses is None: print("no IP addresses") return if not await check_ip_addresses(ip_addreses): print("stop by filters") return await send_ip_addreses(ip_addreses) print("done")