250 lines
8.6 KiB
Python
250 lines
8.6 KiB
Python
import asyncio
|
||
from abc import ABC, abstractmethod
|
||
from typing import NamedTuple, Optional
|
||
from netaddr import valid_ipv4, valid_ipv6
|
||
from httpx import AsyncHTTPTransport
|
||
|
||
|
||
class IPAddreses(NamedTuple):
|
||
"""Набор из названия источника и IP адресов, результат одного из источников"""
|
||
|
||
source_name: str
|
||
ipv4: str
|
||
ipv6: str
|
||
|
||
|
||
class BaseSourceProvider(ABC):
|
||
"""Базовый класс для провайдеров источников"""
|
||
|
||
_childs = {}
|
||
registred = {}
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
self.name, self.config, self.ipv4t, self.ipv6t = (
|
||
name,
|
||
config,
|
||
ipv4t,
|
||
ipv6t,
|
||
)
|
||
self.post_init()
|
||
|
||
def post_init(self):
|
||
"""Метод для переопределения пост инициализации"""
|
||
...
|
||
|
||
def __str__(self):
|
||
return f"{self.__class__.__name__}: {self.name}"
|
||
|
||
def filter_ipv4(self, value: str) -> str:
|
||
"""Функция для проверки валидности IPv4 адреса, возвращает "" если адрес неправильный или пустой"""
|
||
return value if value and valid_ipv4(value) else ""
|
||
|
||
def filter_ipv6(self, value: str) -> str:
|
||
"""Функция для проверки валидности IPv6 адреса, возвращает "" если адрес неправильный или пустой"""
|
||
return value if value and valid_ipv6(value) else ""
|
||
|
||
async def fetch_all(self) -> IPAddreses:
|
||
"""Метод для получения всех IP адресов сразу"""
|
||
|
||
results = await asyncio.gather(
|
||
self.fetch_v4(), self.fetch_v6(), return_exceptions=True
|
||
)
|
||
|
||
return IPAddreses(
|
||
self.name, *("" if isinstance(i, Exception) else i for i in results)
|
||
)
|
||
|
||
async def fetch_v4(self) -> str:
|
||
"""Метод внешнего интерфейса для получения IPv4"""
|
||
result = await asyncio.gather(self.fetch_v4_impl(), return_exceptions=True)
|
||
ipv4 = ""
|
||
if not isinstance(result[0], Exception):
|
||
ipv4 = result[0]
|
||
return self.filter_ipv4(ipv4)
|
||
|
||
async def fetch_v6(self) -> str:
|
||
"""Метод внешнего интерфейса для получения IPv6"""
|
||
result = await asyncio.gather(self.fetch_v6_impl(), return_exceptions=True)
|
||
ipv6 = ""
|
||
if not isinstance(result[0], Exception):
|
||
ipv6 = result[0]
|
||
return self.filter_ipv6(ipv6)
|
||
|
||
def __init_subclass__(cls) -> None:
|
||
BaseSourceProvider._childs[cls.__name__] = cls
|
||
return super().__init_subclass__()
|
||
|
||
@classmethod
|
||
def validate_source_config(cls, name: str, config: dict):
|
||
"""Метод валидации конфигурации для провайдера"""
|
||
if "provider" not in config:
|
||
return False
|
||
prov_name = config["provider"]
|
||
if prov_name not in cls._childs:
|
||
return False
|
||
return True
|
||
|
||
@classmethod
|
||
def register_provider(
|
||
cls,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
"""Метод регистрации провайдера по конфигурации"""
|
||
|
||
if not cls.validate_source_config(name, config):
|
||
return
|
||
provider = cls._childs[config["provider"]]
|
||
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
||
|
||
@abstractmethod
|
||
async def fetch_v4_impl(self) -> Optional[str]:
|
||
"""Необходимый метод для реализации получения IPv4"""
|
||
...
|
||
|
||
@abstractmethod
|
||
async def fetch_v6_impl(self) -> Optional[str]:
|
||
"""Необходимый метод для реализации получения IPv6"""
|
||
...
|
||
|
||
|
||
class BaseOutputProvider(ABC):
|
||
"""Базовый класс для провайдеров вывода"""
|
||
|
||
_childs = {}
|
||
registred = {}
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
self.name, self.config = name, config
|
||
self.ipv4t, self.ipv6t = ipv4t, ipv6t
|
||
self.post_init()
|
||
|
||
def post_init(self):
|
||
"""Метод для переопределения пост инициализации"""
|
||
...
|
||
|
||
def __init_subclass__(cls) -> None:
|
||
BaseOutputProvider._childs[cls.__name__] = cls
|
||
return super().__init_subclass__()
|
||
|
||
def __str__(self):
|
||
return f"{self.__class__.__name__}: {self.name}"
|
||
|
||
def best_transport(self, addr_v4: str, addr_v6: str) -> AsyncHTTPTransport:
|
||
"""Метод выбирает лучший транспорт для отправки адресов (либо ipv4 либо ipv6)"""
|
||
if addr_v6:
|
||
return self.ipv6t
|
||
return self.ipv4t
|
||
|
||
@classmethod
|
||
def validate_source_config(cls, name: str, config: dict):
|
||
"""Метод валидации конфигурации для провайдера"""
|
||
|
||
if "provider" not in config:
|
||
return False
|
||
prov_name = config["provider"]
|
||
if prov_name not in cls._childs:
|
||
return False
|
||
return True
|
||
|
||
@classmethod
|
||
def register_provider(
|
||
cls,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
"""Метод регистрации провайдера по конфигурации"""
|
||
if not cls.validate_source_config(name, config):
|
||
return
|
||
provider = cls._childs[config["provider"]]
|
||
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
||
|
||
async def set_addrs(self, source_provider: str, addr_v4: str, addr_v6: str):
|
||
"""Метод внешнего интерфейса для отправки адресов"""
|
||
return await self.set_addrs_imp(source_provider, addr_v4, addr_v6)
|
||
|
||
@abstractmethod
|
||
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
|
||
"""Необходимый метод для реализации отправки/вывода IP аддресов"""
|
||
...
|
||
|
||
|
||
class BaseFilterProvider(ABC):
|
||
"""Базовый класс для провайдеров фильтров"""
|
||
|
||
_childs = {}
|
||
registred = {}
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
self.name, self.config = name, config
|
||
self.ipv4t, self.ipv6t = ipv4t, ipv6t
|
||
self.post_init()
|
||
|
||
def post_init(self):
|
||
"""Метод для переопределения пост инициализации"""
|
||
...
|
||
|
||
def __init_subclass__(cls) -> None:
|
||
BaseFilterProvider._childs[cls.__name__] = cls
|
||
return super().__init_subclass__()
|
||
|
||
def __str__(self):
|
||
return f"{self.__class__.__name__}: {self.name}"
|
||
|
||
@classmethod
|
||
def validate_source_config(cls, name: str, config: dict) -> bool:
|
||
"""Метод валидации конфигурации для провайдера"""
|
||
|
||
if "provider" not in config:
|
||
return False
|
||
prov_name = config["provider"]
|
||
if prov_name not in cls._childs:
|
||
return False
|
||
return True
|
||
|
||
@classmethod
|
||
def register_provider(
|
||
cls,
|
||
name: str,
|
||
config: dict,
|
||
ipv4t: AsyncHTTPTransport,
|
||
ipv6t: AsyncHTTPTransport,
|
||
):
|
||
"""Метод регистрации провайдера по конфигурации"""
|
||
|
||
if not cls.validate_source_config(name, config):
|
||
return
|
||
provider = cls._childs[config["provider"]]
|
||
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
||
|
||
async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
|
||
"""Метод внешнего интерфейса для проверки адресов"""
|
||
return await self.check_imp(source_provider, addr_v4, addr_v6)
|
||
|
||
@abstractmethod
|
||
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
|
||
"""Необходимый метод реализации проверки"""
|
||
...
|