Compare commits

..

15 Commits

Author SHA1 Message Date
5e3805b567 fix StateHashFile encoding
All checks were successful
Docker Image CI / test (push) Successful in 44s
Docker Image CI / push (push) Successful in 53s
2024-03-03 14:32:12 +03:00
5861836852 fix docs for (sources/filters).files
All checks were successful
Docker Image CI / test (push) Successful in 30s
Docker Image CI / push (push) Successful in 39s
2024-03-02 18:33:09 +03:00
30d0270901 refactor files filters/outputs
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 38s
2024-03-02 18:18:28 +03:00
4d13416222 separate core from cli #6
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 39s
2024-03-02 17:20:44 +03:00
2bf42162ef refactor plugins.py
All checks were successful
Docker Image CI / test (push) Successful in 37s
Docker Image CI / push (push) Successful in 39s
2024-03-02 16:59:23 +03:00
440c33b8e5 upd README
All checks were successful
Docker Image CI / test (push) Successful in 36s
Docker Image CI / push (push) Successful in 40s
2024-02-29 15:56:53 +03:00
Dmitry Belyaev
5b37e91db3 Merge pull request 'add docker build ci' (#7) from ci-docker into master
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 40s
Reviewed-on: #7
for #3
2024-02-27 09:37:21 +00:00
d8563866e5 add docker build ci
All checks were successful
Docker Image CI / test (pull_request) Successful in 31s
Docker Image CI / push (pull_request) Has been skipped
2024-02-27 12:35:21 +03:00
78cf337edf upd README + docs 2024-02-25 23:00:34 +03:00
51f5ed26db + addrs from different sources
like:
ipv4 from src1
ipv6 from src2
2024-02-25 17:46:52 +03:00
3e248b0adf add GenericHttpRegexSource provider 2024-02-25 17:32:00 +03:00
3131f2d0a0 sources refactor 2024-02-25 15:41:29 +03:00
Dmitry Belyaev
20ef74f32b Merge pull request 'add docs' (#1) from docs into master 2024-02-25 11:06:48 +00:00
92cba7d6b9 upd docs 2024-02-25 14:03:29 +03:00
44be3416c0 add docs 2024-02-22 18:24:36 +03:00
27 changed files with 858 additions and 232 deletions

View File

@@ -0,0 +1,73 @@
name: Docker Image CI
on:
push:
branches:
- master
tags:
- v*
pull_request:
branches:
- "master"
jobs:
test:
runs-on: cth-ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build image
uses: docker/build-push-action@v4
with:
context: .
push: false
tags: gitea.b4tman.ru/b4tman/pddnsc:test
push:
needs: test
runs-on: cth-ubuntu-latest
if: github.event_name != 'pull_request'
steps:
- uses: actions/checkout@v4
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: |
gitea.b4tman.ru/b4tman/pddnsc
flavor: |
latest=${{ github.ref == 'refs/heads/master' }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to gitea
uses: docker/login-action@v3
with:
registry: gitea.b4tman.ru
username: b4tman
password: ${{ secrets.PKGS_TOKEN }}
- name: Build and push image
uses: docker/build-push-action@v4
with:
context: .
push: true
#platforms: linux/amd64,linux/arm64
platforms: linux/amd64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

3
.gitignore vendored
View File

@@ -4,3 +4,6 @@ __pycache__
*.pyo
*.pyc
.env*
site/
docs/api/
docs/src/

View File

@@ -2,13 +2,22 @@
Возможно `DDNS` клиент
## запуск
## Запуск
~~~sh
python -m pddnsc.cli
~~~
## конфигурация
либо в [docker](https://www.docker.com)/[podman](https://podman.io) (для запуска по расписанию в `cron`):
~~~bash
docker run -v .state:/app/state:rw \
-v .settings:/app/settings:ro \
-e SCHEDULE=@hourly \
gitea.b4tman.ru/b4tman/pddnsc
~~~
## Конфигурация
Конфигурация находится в файле `settings/config.toml`.
Пример конфигурации:
@@ -27,7 +36,7 @@ python -m pddnsc.cli
filepath = "state/hash.txt"
[outputs]
# сохранение хеша всех ip адресов в файл,
# сохранение хеша всех ip адресов в файл,
# чтобы потом проверить изменились ли они
[outputs.hash-file]
provider = "StateHashFile"
@@ -36,7 +45,7 @@ python -m pddnsc.cli
# сервис доменов на vscale (vds.selectel.ru)
# токен в переменной окружения VSCALE_API_TOKEN
[outputs.vscale]
provider = "VscaleDomains"
provider = "VscaleDomains"
domain = "example.com"
target = "www" # изменяем www.example.com
ttl = 3600
@@ -50,8 +59,22 @@ python -m pddnsc.cli
- `filters` - фильтры, если хоть один вернет ложь то программа ничего никуда не запишет и не отправит, например проверка, что ip адрес не изменился
- `outputs` - модули вывода, например вывод в консоль, запись в файл или создание dns записей на сервере
Все модули источников/фильтров/вывода работают конкурентно через `asyncio`.
Все модули источников/фильтров/вывода работают конкурентно через [asyncio](https://docs.python.org/3/library/asyncio.html#module-asyncio) и [httpx](https://www.python-httpx.org).
### TODO
### Подробная документация
- Добавить названия python классов и их параметры.
Её необходимо собрать с помошью [mkdocs](https://www.mkdocs.org). Для этого нужно установить зависимости:
~~~bash
pip install -r requirements.txt
pip install -r requirements.docs.txt
~~~
После этого либо собрать документацию в каталог `site/`, либо запустить тестовый сервер.
~~~bash
# сборка
mkdocs build
# тестовый сервер
mkdocs serve
~~~

1
docs/index.md Normal file
View File

@@ -0,0 +1 @@
--8<-- "README.md"

17
mkapi_conf.py Normal file
View File

@@ -0,0 +1,17 @@
"""Config functions."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mkdocs.config.defaults import MkDocsConfig
from mkapi.plugins import MkAPIPlugin
def before_on_config(config: MkDocsConfig, plugin: MkAPIPlugin) -> None: # noqa: ARG001
"""Called before `on_config` event of MkAPI plugin."""
if "." not in sys.path:
sys.path.insert(0, ".")

24
mkdocs.yml Normal file
View File

@@ -0,0 +1,24 @@
site_name: pddnsc
site_description: Документация pddnsc
site_author: b4tman
repo_url: https://gitea.b4tman.ru/b4tman/pddnsc
repo_name: pddnsc
copyright: (c) 2024 Дмитрий Беляев
theme:
name: material
language: ru
locale: ru
highlightjs: true
markdown_extensions:
- pymdownx.snippets
plugins:
- search:
lang: ru
- mkapi:
config: mkapi_conf.py
nav:
- Главная: index.md
- Справочник: $api/pddnsc.***

View File

@@ -0,0 +1 @@
""" Возможно клиент DDNS """

View File

@@ -2,19 +2,30 @@ import asyncio
from abc import ABC, abstractmethod
from typing import NamedTuple, Optional
from netaddr import valid_ipv4, valid_ipv6
from httpx import AsyncHTTPTransport
class IPAddreses(NamedTuple):
"""Набор из названия источника и IP адресов, результат одного из источников"""
source_name: str
ipv4: str
ipv6: str
class BaseSourceProvider(ABC):
"""Базовый класс для провайдеров источников"""
_childs = {}
registred = {}
def __init__(self, name, config, ipv4t, ipv6t):
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config, self.ipv4t, self.ipv6t = (
name,
config,
@@ -23,12 +34,24 @@ class BaseSourceProvider(ABC):
)
self.post_init()
def post_init(self): ...
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
def filter_ipv4(self, value: str) -> str:
"""Функция для проверки валидности IPv4 адреса, возвращает "" если адрес неправильный или пустой"""
return value if value and valid_ipv4(value) else ""
def filter_ipv6(self, value: str) -> str:
"""Функция для проверки валидности IPv6 адреса, возвращает "" если адрес неправильный или пустой"""
return value if value and valid_ipv6(value) else ""
async def fetch_all(self) -> IPAddreses:
"""Метод для получения всех IP адресов сразу"""
results = await asyncio.gather(
self.fetch_v4(), self.fetch_v6(), return_exceptions=True
)
@@ -37,12 +60,29 @@ class BaseSourceProvider(ABC):
self.name, *("" if isinstance(i, Exception) else i for i in results)
)
async def fetch_v4(self) -> str:
"""Метод внешнего интерфейса для получения IPv4"""
result = await asyncio.gather(self.fetch_v4_impl(), return_exceptions=True)
ipv4 = ""
if not isinstance(result[0], Exception):
ipv4 = result[0]
return self.filter_ipv4(ipv4)
async def fetch_v6(self) -> str:
"""Метод внешнего интерфейса для получения IPv6"""
result = await asyncio.gather(self.fetch_v6_impl(), return_exceptions=True)
ipv6 = ""
if not isinstance(result[0], Exception):
ipv6 = result[0]
return self.filter_ipv6(ipv6)
def __init_subclass__(cls) -> None:
BaseSourceProvider._childs[cls.__name__] = cls
return super().__init_subclass__()
@classmethod
def validate_source_config(cls, name, config):
def validate_source_config(cls, name: str, config: dict):
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
@@ -51,29 +91,51 @@ class BaseSourceProvider(ABC):
return True
@classmethod
def register_provider(cls, name, config, ipv4t, ipv6t):
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
@abstractmethod
async def fetch_v4(self) -> str: ...
async def fetch_v4_impl(self) -> Optional[str]:
"""Необходимый метод для реализации получения IPv4"""
...
@abstractmethod
async def fetch_v6(self) -> str: ...
async def fetch_v6_impl(self) -> Optional[str]:
"""Необходимый метод для реализации получения IPv6"""
...
class BaseOutputProvider(ABC):
"""Базовый класс для провайдеров вывода"""
_childs = {}
registred = {}
def __init__(self, name, config, ipv4t, ipv6t):
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init()
def post_init(self): ...
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None:
BaseOutputProvider._childs[cls.__name__] = cls
@@ -82,13 +144,16 @@ class BaseOutputProvider(ABC):
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
def best_transport(self, addr_v4, addr_v6):
def best_transport(self, addr_v4: str, addr_v6: str) -> AsyncHTTPTransport:
"""Метод выбирает лучший транспорт для отправки адресов (либо ipv4 либо ipv6)"""
if addr_v6:
return self.ipv6t
return self.ipv4t
@classmethod
def validate_source_config(cls, name, config):
def validate_source_config(cls, name: str, config: dict):
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
@@ -97,29 +162,49 @@ class BaseOutputProvider(ABC):
return True
@classmethod
def register_provider(cls, name, config, ipv4t, ipv6t):
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def set_addrs(self, source_provider, addr_v4, addr_v6):
async def set_addrs(self, source_provider: str, addr_v4: str, addr_v6: str):
"""Метод внешнего интерфейса для отправки адресов"""
return await self.set_addrs_imp(source_provider, addr_v4, addr_v6)
@abstractmethod
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6): ...
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
"""Необходимый метод для реализации отправки/вывода IP аддресов"""
...
class BaseFilterProvider(ABC):
"""Базовый класс для провайдеров фильтров"""
_childs = {}
registred = {}
def __init__(self, name, config, ipv4t, ipv6t):
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init()
def post_init(self): ...
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None:
BaseFilterProvider._childs[cls.__name__] = cls
@@ -128,13 +213,10 @@ class BaseFilterProvider(ABC):
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
def best_client(self, addr_v4, addr_v6):
if addr_v6 is None and addr_v4 is not None:
return self.ipv4t
return self.ipv6t
@classmethod
def validate_source_config(cls, name, config):
def validate_source_config(cls, name: str, config: dict) -> bool:
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
@@ -143,22 +225,25 @@ class BaseFilterProvider(ABC):
return True
@classmethod
def register_provider(cls, name, config, ipv4t, ipv6t):
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def check(self, source_provider, addr_v4, addr_v6):
async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""Метод внешнего интерфейса для проверки адресов"""
return await self.check_imp(source_provider, addr_v4, addr_v6)
@abstractmethod
async def check_imp(self, source_provider, addr_v4, addr_v6): ...
def filter_ipv4(value: str) -> Optional[str]:
return value and valid_ipv4(value) and value or None
def filter_ipv6(value: str) -> Optional[str]:
return value and valid_ipv6(value) and value or None
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""Необходимый метод реализации проверки"""
...

View File

@@ -1,123 +1,16 @@
""" модуль запуска """
import httpx
import asyncio
import toml
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional
import asyncio
def is_valid_addreses(addrs: IPAddreses, config) -> bool:
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return result
async def get_ip_addresses(config) -> Optional[IPAddreses]:
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses):
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses):
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config):
debug = config.get("debug", False)
if debug:
print("DEBUG info:")
print(
f"source classes: {[*BaseSourceProvider._childs]}, {[*map(str, BaseSourceProvider._childs.values())]}"
)
print(
f"filter classes: {[*BaseFilterProvider._childs]}, {[*map(str, BaseFilterProvider._childs.values())]}"
)
print(
f"output classes: {[*BaseOutputProvider._childs]}, {[*map(str, BaseOutputProvider._childs.values())]}"
)
print(
f"source providers: {[*BaseSourceProvider.registred]}, {[*map(str, BaseSourceProvider.registred.values())]}"
)
print(
f"filter providers: {[*BaseFilterProvider.registred]}, {[*map(str, BaseFilterProvider.registred.values())]}"
)
print(
f"output providers: {[*BaseOutputProvider.registred]}, {[*map(str, BaseOutputProvider.registred.values())]}"
)
async def app(config, ipv4t, ipv6t):
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")
from pddnsc.core import app
async def main():
"""Точка входа программы
загрузка конфигурации и создание транспортов IPv4 и IPv6
"""
config = toml.load("settings/config.toml")
async with httpx.AsyncHTTPTransport(
local_address="0.0.0.0", proxy=config.get("proxy_v4")

285
pddnsc/core.py Normal file
View File

@@ -0,0 +1,285 @@
import httpx
import asyncio
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional, NamedTuple
class NeededAddrs(NamedTuple):
ipv4: bool
ipv6: bool
@classmethod
def from_config(cls, config: dict) -> "NeededAddrs":
need_ipv4 = config.get("require_ipv4", False)
need_ipv6 = config.get("require_ipv6", False)
if "require_ipv4" not in config and "require_ipv6" not in config:
need_ipv4 = need_ipv6 = True
return cls(need_ipv4, need_ipv6)
def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return bool(result)
def is_ipv4_ok(ipv4: str, config: dict) -> bool:
"""Проверка IPv4 адреса, подходит или нет
Args:
ipv4 (str): IPv4 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True
def is_ipv6_ok(ipv6: str, config: dict) -> bool:
"""Проверка IPv6 адреса, подходит или нет
Args:
ipv6 (str): IPv6 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True
async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
unit_mode = config.get("unit_mode", False)
if unit_mode:
result = await get_ip_addresses_unit(config)
else:
result = await get_ip_addresses_any(config)
return result
async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим any)
Получает разные адреса от любых источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
need = NeededAddrs.from_config(config)
providers = BaseSourceProvider.registred.values()
ip_addresses, is_done = None, False
last_src, last_ipv4, last_ipv6 = "", "", ""
ok_ipv4, ok_ipv6 = False, False
drop = []
pending = []
if need.ipv4:
pending += [
asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers
]
if need.ipv6:
pending += [
asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers
]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addr = x.result()
if x.get_name().endswith(".v6"):
last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr
else:
last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr
if config.get("debug"):
print("debug:", "get", x.get_name(), ip_addr)
last_src = x.get_name().rsplit(".", maxsplit=1)[0]
ok_ipv4 = is_ipv4_ok(last_ipv4, config)
ok_ipv6 = is_ipv6_ok(last_ipv6, config)
pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)]
pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)]
if ok_ipv4 and pending_v4:
drop += pending_v4
pending = pending_v6
for i in pending_v4:
i.cancel()
pending_v4 = []
if ok_ipv6 and pending_v6:
drop += pending_v6
pending = pending_v4
for i in pending_v6:
i.cancel()
pending_v6 = []
if not (ok_ipv4 and ok_ipv6):
continue
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if not (ok_ipv4 and ok_ipv6):
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
drop = drop + list(pending)
if drop:
gather = asyncio.gather(*drop)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим unit)
Получает адреса только от одного источника
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
def format_registred(name, base):
result = f"{name}:\n"
result += f" classes: {[*base._childs]}\n"
result += f" .values: {[*map(str, base._childs.values())]}\n"
result += f" providers: {[*base.registred]}\n"
result += f" .values: {[*map(str, base.registred.values())]}\n"
return result
if config.get("debug", False):
print("DEBUG ->")
bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
for info in map(format_registred, "sources filters outputs".split(), bases):
print(info)
print("DEBUG <-")
async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")

View File

@@ -1,3 +1,5 @@
""" Фильтры для проверки адресов перед отправкой """
from pddnsc.loaders import load_plugins
load_plugins(__file__)

View File

@@ -1,4 +1,3 @@
import asyncio
import hashlib
import json
import aiofiles
@@ -7,45 +6,79 @@ from os.path import isfile
from pddnsc.base import BaseFilterProvider
class StateHashFilter(BaseFilterProvider):
async def check_imp(self, source_provider, addr_v4, addr_v6):
if not isfile(self.config["filepath"]):
return True
class GenericTextFileFilter(BaseFilterProvider):
"""Проверка на то что хотябы один IP адрес изменился по сравнению с текстом в файле
new_state_str = (addr_v4 or "") + (addr_v6 or "")
new_sha = hashlib.sha256(new_state_str.encode(encoding="utf-8"))
Конфигурация:
- filepath: имя файла
- encoding: кодировка, по умолчанию "utf-8"
- mode: режим открытия, по умолчанию "r"
- check_ipv4: проверять ли IPv4, по умолчанию нет если check_ipv6 есть в конфиге иначе да
- check_ipv6: проверять ли IPv6, по умолчанию нет если check_ipv4 есть в конфиге иначе да
"""
def post_init(self):
super().post_init()
self.filepath = self.config["filepath"]
self.encoding = self.config.get("encoding", "utf-8")
self.mode = self.config.get("mode", "r")
self.check_ipv4 = self.config.get("check_ipv4", False)
self.check_ipv6 = self.config.get("check_ipv6", False)
if "check_ipv4" not in self.config and "check_ipv4" not in self.config:
self.check_ipv4 = self.check_ipv6 = True
self.content = ""
async def read(self) -> str:
async with aiofiles.open(
self.config["filepath"], mode="r", encoding="utf-8"
self.filepath, mode=self.mode, encoding=self.encoding
) as f:
old_state_hash = await f.read()
self.content = await f.read()
return old_state_hash != new_sha.hexdigest()
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
lst = []
if self.check_ipv4:
lst.append(addr_v4)
if self.check_ipv4:
lst.append(addr_v6)
new_content = "\n".join(lst)
return new_content != self.content
class StateFileFilter(BaseFilterProvider):
async def check_imp(self, source_provider, addr_v4, addr_v6):
if not isfile(self.config["filepath"]):
async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
if not isfile(self.filepath):
return True
await self.read()
return await self.check_imp(source_provider, addr_v4, addr_v6)
new_state = {
"ipv4": addr_v4 or "",
"ipv6": addr_v6 or "",
}
async with aiofiles.open(
self.config["filepath"], mode="r", encoding="utf-8"
) as f:
old_state = json.loads(await f.read())
class StateHashFilter(GenericTextFileFilter):
"""Проверка на то что хотябы один IP адрес изменился по хешу сохраненному в файле"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
new_state_str = (self.check_ipv4 and addr_v4 or "") + (
self.check_ipv6 and addr_v6 or ""
)
new_sha = hashlib.sha256(new_state_str.encode(encoding=self.encoding))
return self.content != new_sha.hexdigest()
class StateFileFilter(GenericTextFileFilter):
"""Проверка на то что хотябы один IP адрес изменился по сравнению с данными в json файле"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
new_state = {}
if self.check_ipv4:
new_state["ipv4"] = addr_v4 or ""
if self.check_ipv6:
new_state["ipv6"] = addr_v6 or ""
old_state = json.loads(self.content)
result = True
if "check_ipv4" not in self.config and "check_ipv4" not in self.config:
return new_state != old_state
if self.config.get("check_ipv4", False):
if self.check_ipv4:
result = result and new_state["ipv4"] != old_state["ipv4"]
if self.config.get("check_ipv6", False):
if self.check_ipv6:
result = result and new_state["ipv6"] != old_state["ipv6"]
return result

View File

@@ -1,9 +1,16 @@
""" функции загрузки файлов плагинов """
import os
import traceback
from importlib import util
def load_module(path):
def load_module(path: str):
"""загрузка python модуля
Args:
path (str): имя файла
"""
name = os.path.split(path)[-1]
spec = util.spec_from_file_location(name, path)
module = util.module_from_spec(spec)
@@ -11,7 +18,12 @@ def load_module(path):
return module
def load_plugins(init_filepath):
def load_plugins(init_filepath: str):
"""Загрузка плагинов из пакета
Args:
init_filepath (str): имя `__init__.py` файла пакета
"""
dirpath = os.path.dirname(os.path.abspath(init_filepath))
for fname in os.listdir(dirpath):
if (

View File

@@ -1,3 +1,5 @@
""" Модули вывода """
from pddnsc.loaders import load_plugins
load_plugins(__file__)

View File

@@ -4,7 +4,9 @@ from pddnsc.base import BaseOutputProvider
class JustPrint(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
"""Вывод IP адресов в консоль"""
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
print(f">> {self.name}")
print(f"addresses from: {source_provider}")
print(f"IPv4: {addr_v4}")

View File

@@ -5,24 +5,70 @@ import hashlib
from pddnsc.base import BaseOutputProvider
class StateFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
state = {
"ipv4": addr_v4 or "",
"ipv6": addr_v6 or "",
}
state_str = json.dumps(state)
class GenericTextFile(BaseOutputProvider):
"""Сохранение IP адресов в текстовый файл
Конфигурация:
- filepath: имя файла
- encoding: кодировка, по умолчанию "utf-8"
- mode: режим открытия, по умолчанию "w"
- save_ipv4: сохранять ли IPv4, по умолчанию нет если save_ipv6 есть в конфиге иначе да
- save_ipv6: сохранять ли IPv6, по умолчанию нет если save_ipv4 есть в конфиге иначе да
"""
def post_init(self):
super().post_init()
self.filepath = self.config["filepath"]
self.encoding = self.config.get("encoding", "utf-8")
self.mode = self.config.get("mode", "w")
self.save_ipv4 = self.config.get("save_ipv4", False)
self.save_ipv6 = self.config.get("save_ipv6", False)
if "save_ipv4" not in self.config and "save_ipv4" not in self.config:
self.save_ipv4 = self.save_ipv6 = True
self.content = ""
async def read(self):
async with aiofiles.open(self.filepath, mode="r", encoding=self.encoding) as f:
self.content = await f.read()
def set_content(self, ipv4: str, ipv6: str):
lst = []
if self.save_ipv4:
lst.append(ipv4)
if self.save_ipv6:
lst.append(ipv6)
self.content = "\n".join(lst)
async def write(self):
async with aiofiles.open(
self.config["filepath"], mode="w", encoding="utf-8"
self.filepath, mode=self.mode, encoding=self.encoding
) as f:
await f.write(state_str)
await f.write(self.content)
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
await self.set_content(addr_v4, addr_v6)
await self.write()
class StateHashFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
state_str = (addr_v4 or "") + (addr_v6 or "")
sha = hashlib.sha256(state_str.encode(encoding="utf-8"))
async with aiofiles.open(
self.config["filepath"], mode="w", encoding="utf-8"
) as f:
await f.write(sha.hexdigest())
class StateFile(GenericTextFile):
"""Сохранение всех IP адресов в json файл"""
async def set_content(self, addr_v4: str, addr_v6: str):
state = {}
if self.save_ipv4:
state["ipv4"] = addr_v4 or ""
if self.save_ipv6:
state["ipv6"] = addr_v6 or ""
self.content = json.dumps(state)
class StateHashFile(GenericTextFile):
"""Сохранение хеша от всех IP адресов в файл"""
async def set_content(self, addr_v4: str, addr_v6: str):
state_str = (self.save_ipv4 and addr_v4 or "") + (
self.save_ipv6 and addr_v6 or ""
)
sha = hashlib.sha256(state_str.encode(encoding=self.encoding))
self.content = sha.hexdigest()

View File

@@ -7,13 +7,28 @@ from pddnsc.base import BaseOutputProvider
class VscaleDomains(BaseOutputProvider):
"""Домены на vds.selectel.ru (ранее vscale.io)
Конфигурация:
- api_token_env: имя переменной окружения, в которой находится токен доступа, по умолчанию `VSCALE_API_TOKEN`
- api_base: базовый адрес API, по умолчанию `https://api.vscale.io/v1/`
- domain: имя основного домена, например `example.com`
- target: имя изменяемой записи, например `www` (чтобы изменить `www.example.com`)
- ttl: время жизни записи, по умолчанию 3600
- save_ipv4: сохранять ли ipv4 адрес (A запись)
- save_ipv6: сохранять ли ipv6 адрес (AAAA запись)
(если нет ни одного из [save_ipv4, save_ipv6] то сохраняются оба адреса)
"""
def post_init(self):
token = self.get_api_token()
if not token:
raise KeyError("no api token, use env VSCALE_API_TOKEN")
self.__headers = {"X-Token": token}
self.api_base = self.config.get("api_base", "https://api.vscale.io/v1/")
self.ttl = self.config.get("ttl", 300)
self.ttl = self.config.get("ttl", 3600)
self.domain = self.config["domain"]
target = self.config["target"]
self.target = f"{target}.{self.domain}"
@@ -24,10 +39,12 @@ class VscaleDomains(BaseOutputProvider):
self.save_ipv4 = self.save_ipv6 = True
def get_api_token(self) -> str:
"""получение API токена"""
token_env = self.config.get("api_token_env", "VSCALE_API_TOKEN")
return os.environ[token_env]
async def find_domain_id(self, client) -> Optional[int]:
async def find_domain_id(self, client: httpx.AsyncClient) -> Optional[int]:
"""поиск ID домена"""
response = await client.get("/domains/")
if response.is_success:
data = response.json()
@@ -40,7 +57,10 @@ class VscaleDomains(BaseOutputProvider):
else:
raise ValueError(f"failed to find domain id, code: {response.status_code}")
async def find_record(self, client, domain_id, record_type) -> Optional[int]:
async def find_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str
) -> Optional[int]:
"""поиск ID записи"""
response = await client.get(
f"/domains/{domain_id}/records/",
)
@@ -55,14 +75,25 @@ class VscaleDomains(BaseOutputProvider):
f"error list records {domain_id=}: ", response.status_code
)
async def get_record_value(self, client, domain_id, record_id) -> str:
async def get_record_value(
self, client: httpx.AsyncClient, domain_id: int, record_id: int
) -> str:
"""получение действующего значения записи"""
response = await client.get(f"/domains/{domain_id}/records/{record_id}")
if response.is_success:
data = response.json()
if isinstance(data, dict):
return data["content"]
async def change_record(self, client, domain_id, record_id, record_type, value):
async def change_record(
self,
client: httpx.AsyncClient,
domain_id: int,
record_id: int,
record_type: str,
value: str,
):
"""изменение записи"""
data = {
"content": value,
"name": self.target,
@@ -80,7 +111,10 @@ class VscaleDomains(BaseOutputProvider):
f"failed to change record: {self.target=},{domain_id=}, {record_id=}, {record_type=}, {value=}"
)
async def create_record(self, client, domain_id, record_type, value):
async def create_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str, value: str
):
"""создание новой записи"""
data = {
"content": value,
"name": self.target,
@@ -96,7 +130,7 @@ class VscaleDomains(BaseOutputProvider):
f"failed to create record: {self.target=},{domain_id=}, {record_type=}, {value=}, {response.status_code=}"
)
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
save_addrs = []
if addr_v4 and self.save_ipv4:
save_addrs.append(("A", addr_v4))

View File

@@ -1,21 +1,30 @@
""" модуль взаимодействия и регистрации плагинов """
from httpx import AsyncHTTPTransport
from .base import BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
from . import sources
from . import outputs
from . import filters
from . import outputs
def use_plugins(config, ipv4t, ipv6t):
for source_name in config["sources"]:
def unused():
"""Чтобы убрать предупреждение о неиспользуемых импортах"""
return sources, filters, outputs
def use_plugins(config: dict, ipv4t: AsyncHTTPTransport, ipv6t: AsyncHTTPTransport):
"""Регистрация всех плагинов указаных в конфигурации"""
for source_name in config.get("sources", []):
BaseSourceProvider.register_provider(
source_name, config["sources"][source_name], ipv4t, ipv6t
)
for filter_name in config["filters"]:
for filter_name in config.get("filters", []):
BaseFilterProvider.register_provider(
filter_name, config["filters"][filter_name], ipv4t, ipv6t
)
for output_name in config["outputs"]:
for output_name in config.get("outputs", []):
BaseOutputProvider.register_provider(
output_name, config["outputs"][output_name], ipv4t, ipv6t
)

View File

@@ -1,3 +1,5 @@
""" Модули источников IP адресов """
from pddnsc.loaders import load_plugins
load_plugins(__file__)

View File

@@ -1,26 +1,38 @@
""" модуль имитации получения IP аддресов """
import asyncio
from pddnsc.base import BaseSourceProvider
class DummySource(BaseSourceProvider):
async def fetch_v4(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result=None)
"""Имитация получения пустых адресов"""
async def fetch_v4_impl(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result
async def fetch_v6(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result=None)
async def fetch_v6_impl(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result
class FakeSource(BaseSourceProvider):
async def fetch_v4(self) -> str:
"""Имитация получения заданных в конфигурации адресов
Конфигурация:
- ipv4: строка IPv4, по умолчанию "127.0.0.1"
- ipv6: строка IPv6, по умолчанию "::1"
"""
async def fetch_v4_impl(self) -> str:
result = await asyncio.sleep(
self.config.get("delay", 1), result=self.config.get("ipv4", "127.0.0.1")
)
return result
async def fetch_v6(self) -> str:
async def fetch_v6_impl(self) -> str:
result = await asyncio.sleep(
self.config.get("delay", 1), result=self.config.get("ipv6", "::1")
)

View File

@@ -1,15 +1,25 @@
from typing import Optional
import httpx
from pddnsc.base import BaseSourceProvider, filter_ipv4, filter_ipv6
from pddnsc.base import BaseSourceProvider
class GenericHttpSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде текста.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6")
self.headers = self.config.get("headers", {})
async def fetch_v4(self) -> str:
async def fetch_v4_impl(self) -> Optional[str]:
if not self.url_v4:
return None
@@ -20,9 +30,9 @@ class GenericHttpSource(BaseSourceProvider):
response = await client.get(self.url_v4)
if response.is_success:
result = response.text.strip()
return filter_ipv4(result)
return result
async def fetch_v6(self) -> str:
async def fetch_v6_impl(self) -> Optional[str]:
if not self.url_v6:
return None
@@ -33,10 +43,21 @@ class GenericHttpSource(BaseSourceProvider):
response = await client.get(self.url_v6)
if response.is_success:
result = response.text.strip()
return filter_ipv6(result)
return result
class GenericHttpJsonSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде json.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- key_v4: ключ *json* (`int` для списка, `str` для словаря) для *IPv4*
- key_v6: ключ *json* (`int` для списка, `str` для словаря) для *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6")
@@ -48,7 +69,7 @@ class GenericHttpJsonSource(BaseSourceProvider):
headers = {}
self.headers = headers.update(self.config.get("headers", {}))
async def fetch_v4(self) -> str:
async def fetch_v4_impl(self) -> Optional[str]:
if not self.url_v4 or self.key_v4 is None:
return None
@@ -59,9 +80,9 @@ class GenericHttpJsonSource(BaseSourceProvider):
response = await client.get(self.url_v4)
if response.is_success:
result = response.json()[self.key_v4].strip()
return filter_ipv4(result)
return result
async def fetch_v6(self) -> str:
async def fetch_v6_impl(self) -> Optional[str]:
if not self.url_v6 or self.key_v6 is None:
return None
@@ -72,4 +93,4 @@ class GenericHttpJsonSource(BaseSourceProvider):
response = await client.get(self.url_v6)
if response.is_success:
result = response.json()[self.key_v6].strip()
return filter_ipv6(result)
return result

View File

@@ -0,0 +1,33 @@
import re
from pddnsc.sources.http import GenericHttpSource
class GenericHttpRegexSource(GenericHttpSource):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде текста c помощью регулярных выражений
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- regex_v4: регулярное выражение для *IPv4*, значение по умолчанию см. в коде
- regex_v6: регулярное выражение для *IPv6*, значение по умолчанию см. в коде
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
super().post_init()
rx_v4 = r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
rx_v6 = r"\b(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\b"
self.regex_v4 = re.compile(self.config.get("regex_v4", rx_v4))
self.regex_v6 = re.compile(self.config.get("regex_v6", rx_v6))
def filter_ipv4(self, ipv4):
match = self.regex_v4.search(ipv4 or "")
ipv4 = match and match[0]
return super().filter_ipv4(ipv4)
def filter_ipv6(self, ipv6):
match = self.regex_v6.search(ipv6 or "")
ipv6 = match and match[0]
return super().filter_ipv6(ipv6)

View File

@@ -1,8 +1,9 @@
from pddnsc.sources.http import GenericHttpSource
# https://www.ipify.org/
class IPIFYSource(GenericHttpSource):
"""https://www.ipify.org/"""
def post_init(self):
super().post_init()
self.url_v4 = "https://api4.ipify.org"

View File

@@ -1,8 +1,9 @@
from pddnsc.sources.http import GenericHttpSource
# https://ip.sb/api
class IPSB(GenericHttpSource):
"""https://ip.sb/api"""
def post_init(self):
super().post_init()
self.url_v4 = "https://api-ipv4.ip.sb/ip"

View File

@@ -1,9 +1,12 @@
from pddnsc.sources.http import GenericHttpSource
# https://wtfismyip.com/
# https://gitlab.com/wtfismyip/wtfismyip
class WTFIsMyIP(GenericHttpSource):
"""
https://wtfismyip.com/
https://gitlab.com/wtfismyip/wtfismyip
"""
def post_init(self):
super().post_init()
self.url_v4 = "https://text.ipv4.myip.wtf"

4
requirements.docs.txt Normal file
View File

@@ -0,0 +1,4 @@
mkdocs>=1.5,<2
pymdown-extensions>=10.7,<11
mkapi>=2.1,<3
mkdocs-material>=9.5.10,<10

View File

@@ -1,4 +1,4 @@
debug = true
debug = false
require_ipv4 = true
[sources]
@@ -8,6 +8,10 @@ require_ipv4 = true
provider = "WTFIsMyIP"
[sources.ipsb]
provider = "IPSB"
[sources.checkip_dyndns]
provider = "GenericHttpRegexSource"
url_v4 = "http://checkip.dyndns.org/"
#regex_v4 = "\\b\\d{1,3}(\\.\\d{1,3}){3}\\b"
[sources.ifconfig]
provider = "GenericHttpSource"
url_v4 = "https://ifconfig.me/ip"