From 4d134162228cbbd49b969b8a4633bc5b0a0485d4 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sat, 2 Mar 2024 17:20:44 +0300 Subject: [PATCH] separate core from cli #6 --- pddnsc/cli.py | 291 +------------------------------------------------ pddnsc/core.py | 285 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+), 289 deletions(-) create mode 100644 pddnsc/core.py diff --git a/pddnsc/cli.py b/pddnsc/cli.py index a5cd3ea..62b0de8 100644 --- a/pddnsc/cli.py +++ b/pddnsc/cli.py @@ -1,297 +1,10 @@ """ модуль запуска """ import httpx -import asyncio import toml -from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses -from .plugins import use_plugins -from typing import Optional, NamedTuple +import asyncio - -class NeededAddrs(NamedTuple): - ipv4: bool - ipv6: bool - - @classmethod - def from_config(cls, config: dict) -> "NeededAddrs": - need_ipv4 = config.get("require_ipv4", False) - need_ipv6 = config.get("require_ipv6", False) - if "require_ipv4" not in config and "require_ipv6" not in config: - need_ipv4 = need_ipv6 = True - return cls(need_ipv4, need_ipv6) - - -def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool: - """Проверка валидности IP адресов - - Args: - addrs (IPAddreses): IP адреса - результат одного из источников - config (dict): общая конфигурация - - Returns: - bool: валиден или нет - """ - result = addrs.ipv4 or addrs.ipv6 - if config.get("require_ipv4"): - result = result and addrs.ipv4 - if config.get("require_ipv6"): - result = result and addrs.ipv6 - return bool(result) - - -def is_ipv4_ok(ipv4: str, config: dict) -> bool: - """Проверка IPv4 адреса, подходит или нет - - Args: - ipv4 (str): IPv4 адрес - config (dict): общая конфигурация - - Returns: - bool: подходит или нет - """ - return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True - - -def is_ipv6_ok(ipv6: str, config: dict) -> bool: - """Проверка IPv6 адреса, подходит или нет - - Args: - ipv6 (str): IPv6 адрес - config (dict): общая конфигурация - - Returns: - bool: подходит или нет - """ - return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True - - -async def get_ip_addresses(config: dict) -> Optional[IPAddreses]: - """Получение всех IP адресов из всех источников - - Args: - config (dict): общая конфигурация - - Returns: - Optional[IPAddreses]: результат получения, либо None - """ - - unit_mode = config.get("unit_mode", False) - if unit_mode: - result = await get_ip_addresses_unit(config) - else: - result = await get_ip_addresses_any(config) - return result - - -async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]: - """Получение всех IP адресов из всех источников (режим any) - - Получает разные адреса от любых источников - - Args: - config (dict): общая конфигурация - - Returns: - Optional[IPAddreses]: результат получения, либо None - """ - need = NeededAddrs.from_config(config) - providers = BaseSourceProvider.registred.values() - ip_addresses, is_done = None, False - last_src, last_ipv4, last_ipv6 = "", "", "" - ok_ipv4, ok_ipv6 = False, False - drop = [] - pending = [] - if need.ipv4: - pending += [ - asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers - ] - if need.ipv6: - pending += [ - asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers - ] - while not is_done and pending: - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - for x in done: - ip_addr = x.result() - if x.get_name().endswith(".v6"): - last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr - else: - last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr - if config.get("debug"): - print("debug:", "get", x.get_name(), ip_addr) - last_src = x.get_name().rsplit(".", maxsplit=1)[0] - ok_ipv4 = is_ipv4_ok(last_ipv4, config) - ok_ipv6 = is_ipv6_ok(last_ipv6, config) - pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)] - pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)] - if ok_ipv4 and pending_v4: - drop += pending_v4 - pending = pending_v6 - for i in pending_v4: - i.cancel() - pending_v4 = [] - if ok_ipv6 and pending_v6: - drop += pending_v6 - pending = pending_v4 - for i in pending_v6: - i.cancel() - pending_v6 = [] - if not (ok_ipv4 and ok_ipv6): - continue - ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) - if is_valid_addreses(ip_addresses, config): - is_done = True - break - ip_addresses = None - - if not (ok_ipv4 and ok_ipv6): - ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) - - drop = drop + list(pending) - if drop: - gather = asyncio.gather(*drop) - gather.cancel() - try: - await gather - except asyncio.CancelledError: - pass - return ip_addresses - - -async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]: - """Получение всех IP адресов из всех источников (режим unit) - - Получает адреса только от одного источника - - Args: - config (dict): общая конфигурация - - Returns: - Optional[IPAddreses]: результат получения, либо None - """ - providers = BaseSourceProvider.registred.values() - ip_addresses = None - is_done = False - pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers] - while not is_done and pending: - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - for x in done: - ip_addresses = x.result() - if is_valid_addreses(ip_addresses, config): - is_done = True - break - ip_addresses = None - - if pending: - gather = asyncio.gather(*pending) - gather.cancel() - try: - await gather - except asyncio.CancelledError: - pass - return ip_addresses - - -async def check_ip_addresses(ip_addresses: IPAddreses) -> bool: - """Проверка результата получения IP адресов с помощью фильтров - - Args: - ip_addresses (IPAddreses): IP адреса - - Returns: - bool: корректны ли адреса (изменились ли они), - надо ли продолжать обработку и отправлять их на сервер - """ - providers = BaseFilterProvider.registred.values() - result = True - failed = "" - pending = [ - asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers - ] - while result and pending: - done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - for x in done: - result = x.result() - if not result: - failed = x.get_name() - - if pending: - gather = asyncio.gather(*pending) - gather.cancel() - try: - await gather - except asyncio.CancelledError: - pass - - if not result: - print("failed filter:", failed) - - return result - - -async def send_ip_addreses(ip_addresses: IPAddreses): - """Отправка адресов на все плагины вывода - - Args: - ip_addresses (IPAddreses): IP адреса - """ - providers = BaseOutputProvider.registred.values() - await asyncio.gather( - *( - asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name) - for p in providers - ) - ) - - -def print_debug_info(config: dict): - """Вывод всех зарегистрированных плагинов и другой отладочной информации""" - debug = config.get("debug", False) - if debug: - print("DEBUG info:") - print( - f"source classes: {[*BaseSourceProvider._childs]}, {[*map(str, BaseSourceProvider._childs.values())]}" - ) - print( - f"filter classes: {[*BaseFilterProvider._childs]}, {[*map(str, BaseFilterProvider._childs.values())]}" - ) - print( - f"output classes: {[*BaseOutputProvider._childs]}, {[*map(str, BaseOutputProvider._childs.values())]}" - ) - print( - f"source providers: {[*BaseSourceProvider.registred]}, {[*map(str, BaseSourceProvider.registred.values())]}" - ) - print( - f"filter providers: {[*BaseFilterProvider.registred]}, {[*map(str, BaseFilterProvider.registred.values())]}" - ) - print( - f"output providers: {[*BaseOutputProvider.registred]}, {[*map(str, BaseOutputProvider.registred.values())]}" - ) - - -async def app( - config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport -): - """Запуск приложения - - Args: - config (dict): общая конфигурация - ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4 - ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6 - """ - use_plugins(config, ipv4t, ipv6t) - - print_debug_info(config) - - ip_addreses = await get_ip_addresses(config) - if ip_addreses is None: - print("no IP addresses") - return - if not await check_ip_addresses(ip_addreses): - print("stop by filters") - return - await send_ip_addreses(ip_addreses) - print("done") +from pddnsc.core import app async def main(): diff --git a/pddnsc/core.py b/pddnsc/core.py new file mode 100644 index 0000000..437b179 --- /dev/null +++ b/pddnsc/core.py @@ -0,0 +1,285 @@ +import httpx +import asyncio +from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses +from .plugins import use_plugins +from typing import Optional, NamedTuple + + +class NeededAddrs(NamedTuple): + ipv4: bool + ipv6: bool + + @classmethod + def from_config(cls, config: dict) -> "NeededAddrs": + need_ipv4 = config.get("require_ipv4", False) + need_ipv6 = config.get("require_ipv6", False) + if "require_ipv4" not in config and "require_ipv6" not in config: + need_ipv4 = need_ipv6 = True + return cls(need_ipv4, need_ipv6) + + +def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool: + """Проверка валидности IP адресов + + Args: + addrs (IPAddreses): IP адреса - результат одного из источников + config (dict): общая конфигурация + + Returns: + bool: валиден или нет + """ + result = addrs.ipv4 or addrs.ipv6 + if config.get("require_ipv4"): + result = result and addrs.ipv4 + if config.get("require_ipv6"): + result = result and addrs.ipv6 + return bool(result) + + +def is_ipv4_ok(ipv4: str, config: dict) -> bool: + """Проверка IPv4 адреса, подходит или нет + + Args: + ipv4 (str): IPv4 адрес + config (dict): общая конфигурация + + Returns: + bool: подходит или нет + """ + return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True + + +def is_ipv6_ok(ipv6: str, config: dict) -> bool: + """Проверка IPv6 адреса, подходит или нет + + Args: + ipv6 (str): IPv6 адрес + config (dict): общая конфигурация + + Returns: + bool: подходит или нет + """ + return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True + + +async def get_ip_addresses(config: dict) -> Optional[IPAddreses]: + """Получение всех IP адресов из всех источников + + Args: + config (dict): общая конфигурация + + Returns: + Optional[IPAddreses]: результат получения, либо None + """ + + unit_mode = config.get("unit_mode", False) + if unit_mode: + result = await get_ip_addresses_unit(config) + else: + result = await get_ip_addresses_any(config) + return result + + +async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]: + """Получение всех IP адресов из всех источников (режим any) + + Получает разные адреса от любых источников + + Args: + config (dict): общая конфигурация + + Returns: + Optional[IPAddreses]: результат получения, либо None + """ + need = NeededAddrs.from_config(config) + providers = BaseSourceProvider.registred.values() + ip_addresses, is_done = None, False + last_src, last_ipv4, last_ipv6 = "", "", "" + ok_ipv4, ok_ipv6 = False, False + drop = [] + pending = [] + if need.ipv4: + pending += [ + asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers + ] + if need.ipv6: + pending += [ + asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers + ] + while not is_done and pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for x in done: + ip_addr = x.result() + if x.get_name().endswith(".v6"): + last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr + else: + last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr + if config.get("debug"): + print("debug:", "get", x.get_name(), ip_addr) + last_src = x.get_name().rsplit(".", maxsplit=1)[0] + ok_ipv4 = is_ipv4_ok(last_ipv4, config) + ok_ipv6 = is_ipv6_ok(last_ipv6, config) + pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)] + pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)] + if ok_ipv4 and pending_v4: + drop += pending_v4 + pending = pending_v6 + for i in pending_v4: + i.cancel() + pending_v4 = [] + if ok_ipv6 and pending_v6: + drop += pending_v6 + pending = pending_v4 + for i in pending_v6: + i.cancel() + pending_v6 = [] + if not (ok_ipv4 and ok_ipv6): + continue + ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) + if is_valid_addreses(ip_addresses, config): + is_done = True + break + ip_addresses = None + + if not (ok_ipv4 and ok_ipv6): + ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6) + + drop = drop + list(pending) + if drop: + gather = asyncio.gather(*drop) + gather.cancel() + try: + await gather + except asyncio.CancelledError: + pass + return ip_addresses + + +async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]: + """Получение всех IP адресов из всех источников (режим unit) + + Получает адреса только от одного источника + + Args: + config (dict): общая конфигурация + + Returns: + Optional[IPAddreses]: результат получения, либо None + """ + providers = BaseSourceProvider.registred.values() + ip_addresses = None + is_done = False + pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers] + while not is_done and pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for x in done: + ip_addresses = x.result() + if is_valid_addreses(ip_addresses, config): + is_done = True + break + ip_addresses = None + + if pending: + gather = asyncio.gather(*pending) + gather.cancel() + try: + await gather + except asyncio.CancelledError: + pass + return ip_addresses + + +async def check_ip_addresses(ip_addresses: IPAddreses) -> bool: + """Проверка результата получения IP адресов с помощью фильтров + + Args: + ip_addresses (IPAddreses): IP адреса + + Returns: + bool: корректны ли адреса (изменились ли они), + надо ли продолжать обработку и отправлять их на сервер + """ + providers = BaseFilterProvider.registred.values() + result = True + failed = "" + pending = [ + asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers + ] + while result and pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for x in done: + result = x.result() + if not result: + failed = x.get_name() + + if pending: + gather = asyncio.gather(*pending) + gather.cancel() + try: + await gather + except asyncio.CancelledError: + pass + + if not result: + print("failed filter:", failed) + + return result + + +async def send_ip_addreses(ip_addresses: IPAddreses): + """Отправка адресов на все плагины вывода + + Args: + ip_addresses (IPAddreses): IP адреса + """ + providers = BaseOutputProvider.registred.values() + await asyncio.gather( + *( + asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name) + for p in providers + ) + ) + + +def print_debug_info(config: dict): + """Вывод всех зарегистрированных плагинов и другой отладочной информации""" + + def format_registred(name, base): + result = f"{name}:\n" + result += f" classes: {[*base._childs]}\n" + result += f" .values: {[*map(str, base._childs.values())]}\n" + result += f" providers: {[*base.registred]}\n" + result += f" .values: {[*map(str, base.registred.values())]}\n" + return result + + if config.get("debug", False): + print("DEBUG ->") + bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider + for info in map(format_registred, "sources filters outputs".split(), bases): + print(info) + print("DEBUG <-") + + +async def app( + config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport +): + """Запуск приложения + + Args: + config (dict): общая конфигурация + ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4 + ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6 + """ + use_plugins(config, ipv4t, ipv6t) + + print_debug_info(config) + + ip_addreses = await get_ip_addresses(config) + if ip_addreses is None: + print("no IP addresses") + return + if not await check_ip_addresses(ip_addreses): + print("stop by filters") + return + await send_ip_addreses(ip_addreses) + print("done")