Merge pull request 'add docs' (#1) from docs into master

This commit is contained in:
Dmitry Belyaev 2024-02-25 11:06:48 +00:00
commit 20ef74f32b
22 changed files with 340 additions and 53 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@ __pycache__
*.pyo *.pyo
*.pyc *.pyc
.env* .env*
site/
docs/api/
docs/src/

1
docs/index.md Normal file
View File

@ -0,0 +1 @@
--8<-- "README.md"

17
mkapi_conf.py Normal file
View File

@ -0,0 +1,17 @@
"""Config functions."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mkdocs.config.defaults import MkDocsConfig
from mkapi.plugins import MkAPIPlugin
def before_on_config(config: MkDocsConfig, plugin: MkAPIPlugin) -> None: # noqa: ARG001
"""Called before `on_config` event of MkAPI plugin."""
if "." not in sys.path:
sys.path.insert(0, ".")

24
mkdocs.yml Normal file
View File

@ -0,0 +1,24 @@
site_name: pddnsc
site_description: Документация pddnsc
site_author: b4tman
repo_url: https://gitea.b4tman.ru/b4tman/pddnsc
repo_name: pddnsc
copyright: (c) 2024 Дмитрий Беляев
theme:
name: material
language: ru
locale: ru
highlightjs: true
markdown_extensions:
- pymdownx.snippets
plugins:
- search:
lang: ru
- mkapi:
config: mkapi_conf.py
nav:
- Home: index.md
- Reference: $api/pddnsc.***

View File

@ -0,0 +1 @@
""" Возможно клиент DDNS """

View File

@ -2,19 +2,30 @@ import asyncio
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import NamedTuple, Optional from typing import NamedTuple, Optional
from netaddr import valid_ipv4, valid_ipv6 from netaddr import valid_ipv4, valid_ipv6
from httpx import AsyncHTTPTransport
class IPAddreses(NamedTuple): class IPAddreses(NamedTuple):
"""набор из названия источника и IP адресов, результат одного из источников"""
source_name: str source_name: str
ipv4: str ipv4: str
ipv6: str ipv6: str
class BaseSourceProvider(ABC): class BaseSourceProvider(ABC):
"""базовый класс для провайдеров источников"""
_childs = {} _childs = {}
registred = {} registred = {}
def __init__(self, name, config, ipv4t, ipv6t): def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config, self.ipv4t, self.ipv6t = ( self.name, self.config, self.ipv4t, self.ipv6t = (
name, name,
config, config,
@ -23,12 +34,15 @@ class BaseSourceProvider(ABC):
) )
self.post_init() self.post_init()
def post_init(self): ... def post_init(self):
"""метод для переопределения пост инициализации"""
...
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}: {self.name}" return f"{self.__class__.__name__}: {self.name}"
async def fetch_all(self) -> IPAddreses: async def fetch_all(self) -> IPAddreses:
"""метод для получения всех ip адресов сразу"""
results = await asyncio.gather( results = await asyncio.gather(
self.fetch_v4(), self.fetch_v6(), return_exceptions=True self.fetch_v4(), self.fetch_v6(), return_exceptions=True
) )
@ -42,7 +56,8 @@ class BaseSourceProvider(ABC):
return super().__init_subclass__() return super().__init_subclass__()
@classmethod @classmethod
def validate_source_config(cls, name, config): def validate_source_config(cls, name: str, config: dict):
"""метод валидации конфигурации для провайдера"""
if "provider" not in config: if "provider" not in config:
return False return False
prov_name = config["provider"] prov_name = config["provider"]
@ -51,29 +66,51 @@ class BaseSourceProvider(ABC):
return True return True
@classmethod @classmethod
def register_provider(cls, name, config, ipv4t, ipv6t): def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config): if not cls.validate_source_config(name, config):
return return
provider = cls._childs[config["provider"]] provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t) cls.registred[name] = provider(name, config, ipv4t, ipv6t)
@abstractmethod @abstractmethod
async def fetch_v4(self) -> str: ... async def fetch_v4(self) -> str:
"""необходимый метод для реализации получения ipv4"""
...
@abstractmethod @abstractmethod
async def fetch_v6(self) -> str: ... async def fetch_v6(self) -> str:
"""необходимый метод для реализации получения ipv6"""
...
class BaseOutputProvider(ABC): class BaseOutputProvider(ABC):
"""базовый класс для провайдеров вывода"""
_childs = {} _childs = {}
registred = {} registred = {}
def __init__(self, name, config, ipv4t, ipv6t): def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init() self.post_init()
def post_init(self): ... def post_init(self):
"""метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None: def __init_subclass__(cls) -> None:
BaseOutputProvider._childs[cls.__name__] = cls BaseOutputProvider._childs[cls.__name__] = cls
@ -82,13 +119,16 @@ class BaseOutputProvider(ABC):
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}: {self.name}" return f"{self.__class__.__name__}: {self.name}"
def best_transport(self, addr_v4, addr_v6): def best_transport(self, addr_v4: str, addr_v6: str) -> AsyncHTTPTransport:
"""метод выбирает лучший транспорт для отправки адресов (либо ipv4 либо ipv6)"""
if addr_v6: if addr_v6:
return self.ipv6t return self.ipv6t
return self.ipv4t return self.ipv4t
@classmethod @classmethod
def validate_source_config(cls, name, config): def validate_source_config(cls, name: str, config: dict):
"""метод валидации конфигурации для провайдера"""
if "provider" not in config: if "provider" not in config:
return False return False
prov_name = config["provider"] prov_name = config["provider"]
@ -97,29 +137,49 @@ class BaseOutputProvider(ABC):
return True return True
@classmethod @classmethod
def register_provider(cls, name, config, ipv4t, ipv6t): def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config): if not cls.validate_source_config(name, config):
return return
provider = cls._childs[config["provider"]] provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t) cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def set_addrs(self, source_provider, addr_v4, addr_v6): async def set_addrs(self, source_provider: str, addr_v4: str, addr_v6: str):
"""метод внешнего интерфейса для отправки адресов"""
return await self.set_addrs_imp(source_provider, addr_v4, addr_v6) return await self.set_addrs_imp(source_provider, addr_v4, addr_v6)
@abstractmethod @abstractmethod
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): ... async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
"""необходимый метод для реализации отправки/вывода IP аддресов"""
...
class BaseFilterProvider(ABC): class BaseFilterProvider(ABC):
"""базовый класс для провайдеров фильтров"""
_childs = {} _childs = {}
registred = {} registred = {}
def __init__(self, name, config, ipv4t, ipv6t): def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init() self.post_init()
def post_init(self): ... def post_init(self):
"""метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None: def __init_subclass__(cls) -> None:
BaseFilterProvider._childs[cls.__name__] = cls BaseFilterProvider._childs[cls.__name__] = cls
@ -128,13 +188,10 @@ class BaseFilterProvider(ABC):
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}: {self.name}" return f"{self.__class__.__name__}: {self.name}"
def best_client(self, addr_v4, addr_v6):
if addr_v6 is None and addr_v4 is not None:
return self.ipv4t
return self.ipv6t
@classmethod @classmethod
def validate_source_config(cls, name, config): def validate_source_config(cls, name: str, config: dict) -> bool:
"""метод валидации конфигурации для провайдера"""
if "provider" not in config: if "provider" not in config:
return False return False
prov_name = config["provider"] prov_name = config["provider"]
@ -143,22 +200,35 @@ class BaseFilterProvider(ABC):
return True return True
@classmethod @classmethod
def register_provider(cls, name, config, ipv4t, ipv6t): def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config): if not cls.validate_source_config(name, config):
return return
provider = cls._childs[config["provider"]] provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t) cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def check(self, source_provider, addr_v4, addr_v6): async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""метод внешнего интерфейса для проверки адресов"""
return await self.check_imp(source_provider, addr_v4, addr_v6) return await self.check_imp(source_provider, addr_v4, addr_v6)
@abstractmethod @abstractmethod
async def check_imp(self, source_provider, addr_v4, addr_v6): ... async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""необходимый метод реализации проверки"""
...
def filter_ipv4(value: str) -> Optional[str]: def filter_ipv4(value: str) -> Optional[str]:
"""функция для проверки валидности IPv4 адреса, возвращает None если адрес неправильный или пустой"""
return value and valid_ipv4(value) and value or None return value and valid_ipv4(value) and value or None
def filter_ipv6(value: str) -> Optional[str]: def filter_ipv6(value: str) -> Optional[str]:
"""функция для проверки валидности IPv6 адреса, возвращает None если адрес неправильный или пустой"""
return value and valid_ipv6(value) and value or None return value and valid_ipv6(value) and value or None

View File

@ -1,3 +1,5 @@
""" модуль запуска """
import httpx import httpx
import asyncio import asyncio
import toml import toml
@ -6,7 +8,16 @@ from .plugins import use_plugins
from typing import Optional from typing import Optional
def is_valid_addreses(addrs: IPAddreses, config) -> bool: def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6 result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"): if config.get("require_ipv4"):
result = result and addrs.ipv4 result = result and addrs.ipv4
@ -15,7 +26,15 @@ def is_valid_addreses(addrs: IPAddreses, config) -> bool:
return result return result
async def get_ip_addresses(config) -> Optional[IPAddreses]: async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values() providers = BaseSourceProvider.registred.values()
ip_addresses = None ip_addresses = None
is_done = False is_done = False
@ -39,7 +58,16 @@ async def get_ip_addresses(config) -> Optional[IPAddreses]:
return ip_addresses return ip_addresses
async def check_ip_addresses(ip_addresses): async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values() providers = BaseFilterProvider.registred.values()
result = True result = True
failed = "" failed = ""
@ -67,7 +95,12 @@ async def check_ip_addresses(ip_addresses):
return result return result
async def send_ip_addreses(ip_addresses): async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values() providers = BaseOutputProvider.registred.values()
await asyncio.gather( await asyncio.gather(
*( *(
@ -77,7 +110,8 @@ async def send_ip_addreses(ip_addresses):
) )
def print_debug_info(config): def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
debug = config.get("debug", False) debug = config.get("debug", False)
if debug: if debug:
print("DEBUG info:") print("DEBUG info:")
@ -101,7 +135,16 @@ def print_debug_info(config):
) )
async def app(config, ipv4t, ipv6t): async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t) use_plugins(config, ipv4t, ipv6t)
print_debug_info(config) print_debug_info(config)
@ -118,6 +161,9 @@ async def app(config, ipv4t, ipv6t):
async def main(): async def main():
"""Точка входа программы
загрузка конфигурации и создание транспортов IPv4 и IPv6
"""
config = toml.load("settings/config.toml") config = toml.load("settings/config.toml")
async with httpx.AsyncHTTPTransport( async with httpx.AsyncHTTPTransport(
local_address="0.0.0.0", proxy=config.get("proxy_v4") local_address="0.0.0.0", proxy=config.get("proxy_v4")

View File

@ -1,3 +1,5 @@
""" Фильтры для проверки адресов перед отправкой """
from pddnsc.loaders import load_plugins from pddnsc.loaders import load_plugins
load_plugins(__file__) load_plugins(__file__)

View File

@ -8,7 +8,14 @@ from pddnsc.base import BaseFilterProvider
class StateHashFilter(BaseFilterProvider): class StateHashFilter(BaseFilterProvider):
async def check_imp(self, source_provider, addr_v4, addr_v6): """Проверка на то что хотябы один IP адрес изменился по хешу сохраненному в файле
Конфигурация:
- filepath: имя файла
"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
if not isfile(self.config["filepath"]): if not isfile(self.config["filepath"]):
return True return True
@ -23,13 +30,24 @@ class StateHashFilter(BaseFilterProvider):
class StateFileFilter(BaseFilterProvider): class StateFileFilter(BaseFilterProvider):
async def check_imp(self, source_provider, addr_v4, addr_v6): """Проверка на то что хотябы один IP адрес изменился по сравнению с данными в json файле
Конфигурация:
- filepath: имя файла
- check_ipv4: проверка ipv4 адреса
- check_ipv6: проверка ipv6 адреса
если нет ни одного параметра то проверка выполняется для всех адресов
"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
if not isfile(self.config["filepath"]): if not isfile(self.config["filepath"]):
return True return True
new_state = { new_state = {
"ipv4": addr_v4 or "", "ipv4": addr_v4,
"ipv6": addr_v6 or "", "ipv6": addr_v6,
} }
async with aiofiles.open( async with aiofiles.open(

View File

@ -1,9 +1,16 @@
""" функции загрузки файлов плагинов """
import os import os
import traceback import traceback
from importlib import util from importlib import util
def load_module(path): def load_module(path: str):
"""загрузка python модуля
Args:
path (str): имя файла
"""
name = os.path.split(path)[-1] name = os.path.split(path)[-1]
spec = util.spec_from_file_location(name, path) spec = util.spec_from_file_location(name, path)
module = util.module_from_spec(spec) module = util.module_from_spec(spec)
@ -11,7 +18,12 @@ def load_module(path):
return module return module
def load_plugins(init_filepath): def load_plugins(init_filepath: str):
"""Загрузка плагинов из пакета
Args:
init_filepath (str): имя `__init__.py` файла пакета
"""
dirpath = os.path.dirname(os.path.abspath(init_filepath)) dirpath = os.path.dirname(os.path.abspath(init_filepath))
for fname in os.listdir(dirpath): for fname in os.listdir(dirpath):
if ( if (

View File

@ -1,3 +1,5 @@
""" Модули вывода """
from pddnsc.loaders import load_plugins from pddnsc.loaders import load_plugins
load_plugins(__file__) load_plugins(__file__)

View File

@ -4,7 +4,9 @@ from pddnsc.base import BaseOutputProvider
class JustPrint(BaseOutputProvider): class JustPrint(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): """Вывод IP адресов в консоль"""
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
print(f">> {self.name}") print(f">> {self.name}")
print(f"addresses from: {source_provider}") print(f"addresses from: {source_provider}")
print(f"IPv4: {addr_v4}") print(f"IPv4: {addr_v4}")

View File

@ -6,7 +6,14 @@ from pddnsc.base import BaseOutputProvider
class StateFile(BaseOutputProvider): class StateFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): """Схранение всех IP адресов в json файл
Конфигурация:
- filepath: имя файла
"""
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
state = { state = {
"ipv4": addr_v4 or "", "ipv4": addr_v4 or "",
"ipv6": addr_v6 or "", "ipv6": addr_v6 or "",
@ -19,7 +26,14 @@ class StateFile(BaseOutputProvider):
class StateHashFile(BaseOutputProvider): class StateHashFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): """Сохранение хеша от всех IP адресов в файл
Конфигурация:
- filepath: имя файла
"""
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
state_str = (addr_v4 or "") + (addr_v6 or "") state_str = (addr_v4 or "") + (addr_v6 or "")
sha = hashlib.sha256(state_str.encode(encoding="utf-8")) sha = hashlib.sha256(state_str.encode(encoding="utf-8"))
async with aiofiles.open( async with aiofiles.open(

View File

@ -7,13 +7,28 @@ from pddnsc.base import BaseOutputProvider
class VscaleDomains(BaseOutputProvider): class VscaleDomains(BaseOutputProvider):
"""Домены на vds.selectel.ru (ранее vscale.io)
Конфигурация:
- api_token_env: имя переменной окружения, в которой находится токен доступа, по умолчанию `VSCALE_API_TOKEN`
- api_base: базовый адрес API, по умолчанию `https://api.vscale.io/v1/`
- domain: имя основного домена, например `example.com`
- target: имя изменяемой записи, например `www` (чтобы изменить `www.example.com`)
- ttl: время жизни записи, по умолчанию 3600
- save_ipv4: сохранять ли ipv4 адрес (A запись)
- save_ipv6: сохранять ли ipv6 адрес (AAAA запись)
(если нет ни одного из [save_ipv4, save_ipv6] то сохраняются оба адреса)
"""
def post_init(self): def post_init(self):
token = self.get_api_token() token = self.get_api_token()
if not token: if not token:
raise KeyError("no api token, use env VSCALE_API_TOKEN") raise KeyError("no api token, use env VSCALE_API_TOKEN")
self.__headers = {"X-Token": token} self.__headers = {"X-Token": token}
self.api_base = self.config.get("api_base", "https://api.vscale.io/v1/") self.api_base = self.config.get("api_base", "https://api.vscale.io/v1/")
self.ttl = self.config.get("ttl", 300) self.ttl = self.config.get("ttl", 3600)
self.domain = self.config["domain"] self.domain = self.config["domain"]
target = self.config["target"] target = self.config["target"]
self.target = f"{target}.{self.domain}" self.target = f"{target}.{self.domain}"
@ -24,10 +39,12 @@ class VscaleDomains(BaseOutputProvider):
self.save_ipv4 = self.save_ipv6 = True self.save_ipv4 = self.save_ipv6 = True
def get_api_token(self) -> str: def get_api_token(self) -> str:
"""получение API токена"""
token_env = self.config.get("api_token_env", "VSCALE_API_TOKEN") token_env = self.config.get("api_token_env", "VSCALE_API_TOKEN")
return os.environ[token_env] return os.environ[token_env]
async def find_domain_id(self, client) -> Optional[int]: async def find_domain_id(self, client: httpx.AsyncClient) -> Optional[int]:
"""поиск ID домена"""
response = await client.get("/domains/") response = await client.get("/domains/")
if response.is_success: if response.is_success:
data = response.json() data = response.json()
@ -40,7 +57,10 @@ class VscaleDomains(BaseOutputProvider):
else: else:
raise ValueError(f"failed to find domain id, code: {response.status_code}") raise ValueError(f"failed to find domain id, code: {response.status_code}")
async def find_record(self, client, domain_id, record_type) -> Optional[int]: async def find_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str
) -> Optional[int]:
"""поиск ID записи"""
response = await client.get( response = await client.get(
f"/domains/{domain_id}/records/", f"/domains/{domain_id}/records/",
) )
@ -55,14 +75,25 @@ class VscaleDomains(BaseOutputProvider):
f"error list records {domain_id=}: ", response.status_code f"error list records {domain_id=}: ", response.status_code
) )
async def get_record_value(self, client, domain_id, record_id) -> str: async def get_record_value(
self, client: httpx.AsyncClient, domain_id: int, record_id: int
) -> str:
"""получение действующего значения записи"""
response = await client.get(f"/domains/{domain_id}/records/{record_id}") response = await client.get(f"/domains/{domain_id}/records/{record_id}")
if response.is_success: if response.is_success:
data = response.json() data = response.json()
if isinstance(data, dict): if isinstance(data, dict):
return data["content"] return data["content"]
async def change_record(self, client, domain_id, record_id, record_type, value): async def change_record(
self,
client: httpx.AsyncClient,
domain_id: int,
record_id: int,
record_type: str,
value: str,
):
"""изменение записи"""
data = { data = {
"content": value, "content": value,
"name": self.target, "name": self.target,
@ -80,7 +111,10 @@ class VscaleDomains(BaseOutputProvider):
f"failed to change record: {self.target=},{domain_id=}, {record_id=}, {record_type=}, {value=}" f"failed to change record: {self.target=},{domain_id=}, {record_id=}, {record_type=}, {value=}"
) )
async def create_record(self, client, domain_id, record_type, value): async def create_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str, value: str
):
"""создание новой записи"""
data = { data = {
"content": value, "content": value,
"name": self.target, "name": self.target,
@ -96,7 +130,7 @@ class VscaleDomains(BaseOutputProvider):
f"failed to create record: {self.target=},{domain_id=}, {record_type=}, {value=}, {response.status_code=}" f"failed to create record: {self.target=},{domain_id=}, {record_type=}, {value=}, {response.status_code=}"
) )
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
save_addrs = [] save_addrs = []
if addr_v4 and self.save_ipv4: if addr_v4 and self.save_ipv4:
save_addrs.append(("A", addr_v4)) save_addrs.append(("A", addr_v4))

View File

@ -1,10 +1,14 @@
""" модуль взаимодействия и регистрации плагинов """
from httpx import AsyncHTTPTransport
from .base import BaseSourceProvider, BaseFilterProvider, BaseOutputProvider from .base import BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
from . import sources from . import sources
from . import outputs from . import outputs
from . import filters from . import filters
def use_plugins(config, ipv4t, ipv6t): def use_plugins(config: dict, ipv4t: AsyncHTTPTransport, ipv6t: AsyncHTTPTransport):
"""Регистрация всех плагинов указаных в конфигурации"""
for source_name in config["sources"]: for source_name in config["sources"]:
BaseSourceProvider.register_provider( BaseSourceProvider.register_provider(
source_name, config["sources"][source_name], ipv4t, ipv6t source_name, config["sources"][source_name], ipv4t, ipv6t

View File

@ -1,3 +1,5 @@
""" Модули источников IP адресов """
from pddnsc.loaders import load_plugins from pddnsc.loaders import load_plugins
load_plugins(__file__) load_plugins(__file__)

View File

@ -1,19 +1,25 @@
""" модуль имитации получения IP аддресов """
import asyncio import asyncio
from pddnsc.base import BaseSourceProvider from pddnsc.base import BaseSourceProvider
class DummySource(BaseSourceProvider): class DummySource(BaseSourceProvider):
"""имитация получения пустых адресов"""
async def fetch_v4(self) -> str: async def fetch_v4(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result=None) result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result return result
async def fetch_v6(self) -> str: async def fetch_v6(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result=None) result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result return result
class FakeSource(BaseSourceProvider): class FakeSource(BaseSourceProvider):
"""имитация получения заданных в конфигурации адресов"""
async def fetch_v4(self) -> str: async def fetch_v4(self) -> str:
result = await asyncio.sleep( result = await asyncio.sleep(
self.config.get("delay", 1), result=self.config.get("ipv4", "127.0.0.1") self.config.get("delay", 1), result=self.config.get("ipv4", "127.0.0.1")

View File

@ -4,6 +4,15 @@ from pddnsc.base import BaseSourceProvider, filter_ipv4, filter_ipv6
class GenericHttpSource(BaseSourceProvider): class GenericHttpSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде текста.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self): def post_init(self):
self.url_v4 = self.config.get("url_v4") self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6") self.url_v6 = self.config.get("url_v6")
@ -37,6 +46,17 @@ class GenericHttpSource(BaseSourceProvider):
class GenericHttpJsonSource(BaseSourceProvider): class GenericHttpJsonSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде json.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- key_v4: ключ *json* (`int` для списка, `str` для словаря) для *IPv4*
- key_v6: ключ *json* (`int` для списка, `str` для словаря) для *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self): def post_init(self):
self.url_v4 = self.config.get("url_v4") self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6") self.url_v6 = self.config.get("url_v6")

View File

@ -1,8 +1,9 @@
from pddnsc.sources.http import GenericHttpSource from pddnsc.sources.http import GenericHttpSource
# https://www.ipify.org/
class IPIFYSource(GenericHttpSource): class IPIFYSource(GenericHttpSource):
"""https://www.ipify.org/"""
def post_init(self): def post_init(self):
super().post_init() super().post_init()
self.url_v4 = "https://api4.ipify.org" self.url_v4 = "https://api4.ipify.org"

View File

@ -1,8 +1,9 @@
from pddnsc.sources.http import GenericHttpSource from pddnsc.sources.http import GenericHttpSource
# https://ip.sb/api
class IPSB(GenericHttpSource): class IPSB(GenericHttpSource):
"""https://ip.sb/api"""
def post_init(self): def post_init(self):
super().post_init() super().post_init()
self.url_v4 = "https://api-ipv4.ip.sb/ip" self.url_v4 = "https://api-ipv4.ip.sb/ip"

View File

@ -1,9 +1,12 @@
from pddnsc.sources.http import GenericHttpSource from pddnsc.sources.http import GenericHttpSource
# https://wtfismyip.com/
# https://gitlab.com/wtfismyip/wtfismyip
class WTFIsMyIP(GenericHttpSource): class WTFIsMyIP(GenericHttpSource):
"""
https://wtfismyip.com/
https://gitlab.com/wtfismyip/wtfismyip
"""
def post_init(self): def post_init(self):
super().post_init() super().post_init()
self.url_v4 = "https://text.ipv4.myip.wtf" self.url_v4 = "https://text.ipv4.myip.wtf"

4
requirements.docs.txt Normal file
View File

@ -0,0 +1,4 @@
mkdocs>=1.5,<2
pymdown-extensions>=10.7,<11
mkapi>=2.1,<3
mkdocs-material>=9.5.10,<10