pddnsc/pddnsc/core.py

286 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import httpx
import asyncio
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional, NamedTuple
class NeededAddrs(NamedTuple):
ipv4: bool
ipv6: bool
@classmethod
def from_config(cls, config: dict) -> "NeededAddrs":
need_ipv4 = config.get("require_ipv4", False)
need_ipv6 = config.get("require_ipv6", False)
if "require_ipv4" not in config and "require_ipv6" not in config:
need_ipv4 = need_ipv6 = True
return cls(need_ipv4, need_ipv6)
def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return bool(result)
def is_ipv4_ok(ipv4: str, config: dict) -> bool:
"""Проверка IPv4 адреса, подходит или нет
Args:
ipv4 (str): IPv4 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True
def is_ipv6_ok(ipv6: str, config: dict) -> bool:
"""Проверка IPv6 адреса, подходит или нет
Args:
ipv6 (str): IPv6 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True
async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
unit_mode = config.get("unit_mode", False)
if unit_mode:
result = await get_ip_addresses_unit(config)
else:
result = await get_ip_addresses_any(config)
return result
async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим any)
Получает разные адреса от любых источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
need = NeededAddrs.from_config(config)
providers = BaseSourceProvider.registred.values()
ip_addresses, is_done = None, False
last_src, last_ipv4, last_ipv6 = "", "", ""
ok_ipv4, ok_ipv6 = False, False
drop = []
pending = []
if need.ipv4:
pending += [
asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers
]
if need.ipv6:
pending += [
asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers
]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addr = x.result()
if x.get_name().endswith(".v6"):
last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr
else:
last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr
if config.get("debug"):
print("debug:", "get", x.get_name(), ip_addr)
last_src = x.get_name().rsplit(".", maxsplit=1)[0]
ok_ipv4 = is_ipv4_ok(last_ipv4, config)
ok_ipv6 = is_ipv6_ok(last_ipv6, config)
pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)]
pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)]
if ok_ipv4 and pending_v4:
drop += pending_v4
pending = pending_v6
for i in pending_v4:
i.cancel()
pending_v4 = []
if ok_ipv6 and pending_v6:
drop += pending_v6
pending = pending_v4
for i in pending_v6:
i.cancel()
pending_v6 = []
if not (ok_ipv4 and ok_ipv6):
continue
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if not (ok_ipv4 and ok_ipv6):
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
drop = drop + list(pending)
if drop:
gather = asyncio.gather(*drop)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим unit)
Получает адреса только от одного источника
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
def format_registred(name, base):
result = f"{name}:\n"
result += f" classes: {[*base._childs]}\n"
result += f" .values: {[*map(str, base._childs.values())]}\n"
result += f" providers: {[*base.registred]}\n"
result += f" .values: {[*map(str, base.registred.values())]}\n"
return result
if config.get("debug", False):
print("DEBUG ->")
bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
for info in map(format_registred, "sources filters outputs".split(), bases):
print(info)
print("DEBUG <-")
async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")