189 lines
6.2 KiB
Python
189 lines
6.2 KiB
Python
import asyncio
|
|
from abc import ABC, abstractmethod
|
|
from typing import NamedTuple, Optional
|
|
from netaddr import valid_ipv4, valid_ipv6
|
|
|
|
|
|
class IPAddreses(NamedTuple):
|
|
"""набор из названия источника и IP адресов, результат одного из источников"""
|
|
|
|
source_name: str
|
|
ipv4: str
|
|
ipv6: str
|
|
|
|
|
|
class BaseSourceProvider(ABC):
|
|
"""базовый класс для провайдеров источников"""
|
|
|
|
_childs = {}
|
|
registred = {}
|
|
|
|
def __init__(self, name, config, ipv4t, ipv6t):
|
|
self.name, self.config, self.ipv4t, self.ipv6t = (
|
|
name,
|
|
config,
|
|
ipv4t,
|
|
ipv6t,
|
|
)
|
|
self.post_init()
|
|
|
|
def post_init(self):
|
|
"""метод для переопределения пост инициализации"""
|
|
...
|
|
|
|
def __str__(self):
|
|
return f"{self.__class__.__name__}: {self.name}"
|
|
|
|
async def fetch_all(self) -> IPAddreses:
|
|
results = await asyncio.gather(
|
|
self.fetch_v4(), self.fetch_v6(), return_exceptions=True
|
|
)
|
|
|
|
return IPAddreses(
|
|
self.name, *("" if isinstance(i, Exception) else i for i in results)
|
|
)
|
|
|
|
def __init_subclass__(cls) -> None:
|
|
BaseSourceProvider._childs[cls.__name__] = cls
|
|
return super().__init_subclass__()
|
|
|
|
@classmethod
|
|
def validate_source_config(cls, name, config):
|
|
if "provider" not in config:
|
|
return False
|
|
prov_name = config["provider"]
|
|
if prov_name not in cls._childs:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def register_provider(cls, name, config, ipv4t, ipv6t):
|
|
if not cls.validate_source_config(name, config):
|
|
return
|
|
provider = cls._childs[config["provider"]]
|
|
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
|
|
|
@abstractmethod
|
|
async def fetch_v4(self) -> str:
|
|
"""необходимый метод для реализации получения ipv4"""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def fetch_v6(self) -> str:
|
|
"""необходимый метод для реализации получения ipv6"""
|
|
...
|
|
|
|
|
|
class BaseOutputProvider(ABC):
|
|
"""базовый класс для провайдеров вывода"""
|
|
|
|
_childs = {}
|
|
registred = {}
|
|
|
|
def __init__(self, name, config, ipv4t, ipv6t):
|
|
self.name, self.config = name, config
|
|
self.ipv4t, self.ipv6t = ipv4t, ipv6t
|
|
self.post_init()
|
|
|
|
def post_init(self):
|
|
"""метод для переопределения пост инициализации"""
|
|
...
|
|
|
|
def __init_subclass__(cls) -> None:
|
|
BaseOutputProvider._childs[cls.__name__] = cls
|
|
return super().__init_subclass__()
|
|
|
|
def __str__(self):
|
|
return f"{self.__class__.__name__}: {self.name}"
|
|
|
|
def best_transport(self, addr_v4, addr_v6):
|
|
if addr_v6:
|
|
return self.ipv6t
|
|
return self.ipv4t
|
|
|
|
@classmethod
|
|
def validate_source_config(cls, name, config):
|
|
if "provider" not in config:
|
|
return False
|
|
prov_name = config["provider"]
|
|
if prov_name not in cls._childs:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def register_provider(cls, name, config, ipv4t, ipv6t):
|
|
if not cls.validate_source_config(name, config):
|
|
return
|
|
provider = cls._childs[config["provider"]]
|
|
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
|
|
|
async def set_addrs(self, source_provider, addr_v4, addr_v6):
|
|
return await self.set_addrs_imp(source_provider, addr_v4, addr_v6)
|
|
|
|
@abstractmethod
|
|
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
|
|
"""необходимый метод для реализации отправки/вывода IP аддресов"""
|
|
...
|
|
|
|
|
|
class BaseFilterProvider(ABC):
|
|
"""базовый класс для провайдеров фильтров"""
|
|
|
|
_childs = {}
|
|
registred = {}
|
|
|
|
def __init__(self, name, config, ipv4t, ipv6t):
|
|
self.name, self.config = name, config
|
|
self.ipv4t, self.ipv6t = ipv4t, ipv6t
|
|
self.post_init()
|
|
|
|
def post_init(self):
|
|
"""метод для переопределения пост инициализации"""
|
|
...
|
|
|
|
def __init_subclass__(cls) -> None:
|
|
BaseFilterProvider._childs[cls.__name__] = cls
|
|
return super().__init_subclass__()
|
|
|
|
def __str__(self):
|
|
return f"{self.__class__.__name__}: {self.name}"
|
|
|
|
def best_client(self, addr_v4, addr_v6):
|
|
if addr_v6 is None and addr_v4 is not None:
|
|
return self.ipv4t
|
|
return self.ipv6t
|
|
|
|
@classmethod
|
|
def validate_source_config(cls, name, config):
|
|
if "provider" not in config:
|
|
return False
|
|
prov_name = config["provider"]
|
|
if prov_name not in cls._childs:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def register_provider(cls, name, config, ipv4t, ipv6t):
|
|
if not cls.validate_source_config(name, config):
|
|
return
|
|
provider = cls._childs[config["provider"]]
|
|
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
|
|
|
|
async def check(self, source_provider, addr_v4, addr_v6):
|
|
return await self.check_imp(source_provider, addr_v4, addr_v6)
|
|
|
|
@abstractmethod
|
|
async def check_imp(self, source_provider, addr_v4, addr_v6):
|
|
"""необходимый метод реализации проверки"""
|
|
...
|
|
|
|
|
|
def filter_ipv4(value: str) -> Optional[str]:
|
|
"""функция для проверки валидности IPv4 адреса, возвращает None если адрес неправильный или пустой"""
|
|
return value and valid_ipv4(value) and value or None
|
|
|
|
|
|
def filter_ipv6(value: str) -> Optional[str]:
|
|
"""функция для проверки валидности IPv6 адреса, возвращает None если адрес неправильный или пустой"""
|
|
return value and valid_ipv6(value) and value or None
|