separate core from cli #6
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 39s

This commit is contained in:
Dmitry Belyaev 2024-03-02 17:20:44 +03:00
parent 2bf42162ef
commit 4d13416222
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3
2 changed files with 287 additions and 289 deletions

View File

@ -1,297 +1,10 @@
""" модуль запуска """
import httpx
import asyncio
import toml
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional, NamedTuple
import asyncio
class NeededAddrs(NamedTuple):
ipv4: bool
ipv6: bool
@classmethod
def from_config(cls, config: dict) -> "NeededAddrs":
need_ipv4 = config.get("require_ipv4", False)
need_ipv6 = config.get("require_ipv6", False)
if "require_ipv4" not in config and "require_ipv6" not in config:
need_ipv4 = need_ipv6 = True
return cls(need_ipv4, need_ipv6)
def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return bool(result)
def is_ipv4_ok(ipv4: str, config: dict) -> bool:
"""Проверка IPv4 адреса, подходит или нет
Args:
ipv4 (str): IPv4 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True
def is_ipv6_ok(ipv6: str, config: dict) -> bool:
"""Проверка IPv6 адреса, подходит или нет
Args:
ipv6 (str): IPv6 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True
async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
unit_mode = config.get("unit_mode", False)
if unit_mode:
result = await get_ip_addresses_unit(config)
else:
result = await get_ip_addresses_any(config)
return result
async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим any)
Получает разные адреса от любых источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
need = NeededAddrs.from_config(config)
providers = BaseSourceProvider.registred.values()
ip_addresses, is_done = None, False
last_src, last_ipv4, last_ipv6 = "", "", ""
ok_ipv4, ok_ipv6 = False, False
drop = []
pending = []
if need.ipv4:
pending += [
asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers
]
if need.ipv6:
pending += [
asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers
]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addr = x.result()
if x.get_name().endswith(".v6"):
last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr
else:
last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr
if config.get("debug"):
print("debug:", "get", x.get_name(), ip_addr)
last_src = x.get_name().rsplit(".", maxsplit=1)[0]
ok_ipv4 = is_ipv4_ok(last_ipv4, config)
ok_ipv6 = is_ipv6_ok(last_ipv6, config)
pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)]
pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)]
if ok_ipv4 and pending_v4:
drop += pending_v4
pending = pending_v6
for i in pending_v4:
i.cancel()
pending_v4 = []
if ok_ipv6 and pending_v6:
drop += pending_v6
pending = pending_v4
for i in pending_v6:
i.cancel()
pending_v6 = []
if not (ok_ipv4 and ok_ipv6):
continue
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if not (ok_ipv4 and ok_ipv6):
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
drop = drop + list(pending)
if drop:
gather = asyncio.gather(*drop)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим unit)
Получает адреса только от одного источника
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
debug = config.get("debug", False)
if debug:
print("DEBUG info:")
print(
f"source classes: {[*BaseSourceProvider._childs]}, {[*map(str, BaseSourceProvider._childs.values())]}"
)
print(
f"filter classes: {[*BaseFilterProvider._childs]}, {[*map(str, BaseFilterProvider._childs.values())]}"
)
print(
f"output classes: {[*BaseOutputProvider._childs]}, {[*map(str, BaseOutputProvider._childs.values())]}"
)
print(
f"source providers: {[*BaseSourceProvider.registred]}, {[*map(str, BaseSourceProvider.registred.values())]}"
)
print(
f"filter providers: {[*BaseFilterProvider.registred]}, {[*map(str, BaseFilterProvider.registred.values())]}"
)
print(
f"output providers: {[*BaseOutputProvider.registred]}, {[*map(str, BaseOutputProvider.registred.values())]}"
)
async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")
from pddnsc.core import app
async def main():

285
pddnsc/core.py Normal file
View File

@ -0,0 +1,285 @@
import httpx
import asyncio
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional, NamedTuple
class NeededAddrs(NamedTuple):
ipv4: bool
ipv6: bool
@classmethod
def from_config(cls, config: dict) -> "NeededAddrs":
need_ipv4 = config.get("require_ipv4", False)
need_ipv6 = config.get("require_ipv6", False)
if "require_ipv4" not in config and "require_ipv6" not in config:
need_ipv4 = need_ipv6 = True
return cls(need_ipv4, need_ipv6)
def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return bool(result)
def is_ipv4_ok(ipv4: str, config: dict) -> bool:
"""Проверка IPv4 адреса, подходит или нет
Args:
ipv4 (str): IPv4 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True
def is_ipv6_ok(ipv6: str, config: dict) -> bool:
"""Проверка IPv6 адреса, подходит или нет
Args:
ipv6 (str): IPv6 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True
async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
unit_mode = config.get("unit_mode", False)
if unit_mode:
result = await get_ip_addresses_unit(config)
else:
result = await get_ip_addresses_any(config)
return result
async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим any)
Получает разные адреса от любых источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
need = NeededAddrs.from_config(config)
providers = BaseSourceProvider.registred.values()
ip_addresses, is_done = None, False
last_src, last_ipv4, last_ipv6 = "", "", ""
ok_ipv4, ok_ipv6 = False, False
drop = []
pending = []
if need.ipv4:
pending += [
asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers
]
if need.ipv6:
pending += [
asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers
]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addr = x.result()
if x.get_name().endswith(".v6"):
last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr
else:
last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr
if config.get("debug"):
print("debug:", "get", x.get_name(), ip_addr)
last_src = x.get_name().rsplit(".", maxsplit=1)[0]
ok_ipv4 = is_ipv4_ok(last_ipv4, config)
ok_ipv6 = is_ipv6_ok(last_ipv6, config)
pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)]
pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)]
if ok_ipv4 and pending_v4:
drop += pending_v4
pending = pending_v6
for i in pending_v4:
i.cancel()
pending_v4 = []
if ok_ipv6 and pending_v6:
drop += pending_v6
pending = pending_v4
for i in pending_v6:
i.cancel()
pending_v6 = []
if not (ok_ipv4 and ok_ipv6):
continue
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if not (ok_ipv4 and ok_ipv6):
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
drop = drop + list(pending)
if drop:
gather = asyncio.gather(*drop)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим unit)
Получает адреса только от одного источника
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
def format_registred(name, base):
result = f"{name}:\n"
result += f" classes: {[*base._childs]}\n"
result += f" .values: {[*map(str, base._childs.values())]}\n"
result += f" providers: {[*base.registred]}\n"
result += f" .values: {[*map(str, base.registred.values())]}\n"
return result
if config.get("debug", False):
print("DEBUG ->")
bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
for info in map(format_registred, "sources filters outputs".split(), bases):
print(info)
print("DEBUG <-")
async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")