Compare commits

..

46 Commits

Author SHA1 Message Date
5e3805b567 fix StateHashFile encoding
All checks were successful
Docker Image CI / test (push) Successful in 44s
Docker Image CI / push (push) Successful in 53s
2024-03-03 14:32:12 +03:00
5861836852 fix docs for (sources/filters).files
All checks were successful
Docker Image CI / test (push) Successful in 30s
Docker Image CI / push (push) Successful in 39s
2024-03-02 18:33:09 +03:00
30d0270901 refactor files filters/outputs
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 38s
2024-03-02 18:18:28 +03:00
4d13416222 separate core from cli #6
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 39s
2024-03-02 17:20:44 +03:00
2bf42162ef refactor plugins.py
All checks were successful
Docker Image CI / test (push) Successful in 37s
Docker Image CI / push (push) Successful in 39s
2024-03-02 16:59:23 +03:00
440c33b8e5 upd README
All checks were successful
Docker Image CI / test (push) Successful in 36s
Docker Image CI / push (push) Successful in 40s
2024-02-29 15:56:53 +03:00
Dmitry Belyaev
5b37e91db3 Merge pull request 'add docker build ci' (#7) from ci-docker into master
All checks were successful
Docker Image CI / test (push) Successful in 31s
Docker Image CI / push (push) Successful in 40s
Reviewed-on: #7
for #3
2024-02-27 09:37:21 +00:00
d8563866e5 add docker build ci
All checks were successful
Docker Image CI / test (pull_request) Successful in 31s
Docker Image CI / push (pull_request) Has been skipped
2024-02-27 12:35:21 +03:00
78cf337edf upd README + docs 2024-02-25 23:00:34 +03:00
51f5ed26db + addrs from different sources
like:
ipv4 from src1
ipv6 from src2
2024-02-25 17:46:52 +03:00
3e248b0adf add GenericHttpRegexSource provider 2024-02-25 17:32:00 +03:00
3131f2d0a0 sources refactor 2024-02-25 15:41:29 +03:00
Dmitry Belyaev
20ef74f32b Merge pull request 'add docs' (#1) from docs into master 2024-02-25 11:06:48 +00:00
92cba7d6b9 upd docs 2024-02-25 14:03:29 +03:00
44be3416c0 add docs 2024-02-22 18:24:36 +03:00
ca89a74c4b Dockerfile: add log datetime 2024-02-21 21:13:28 +03:00
cbc94394bb upd config 2024-02-21 16:07:10 +03:00
214002c56e add examples to config 2024-02-21 16:04:05 +03:00
9428b92e6a add IP.SB source 2024-02-21 15:10:32 +03:00
90158ca5ad add require_ipv4 + require_ipv6 config params 2024-02-21 12:35:42 +03:00
62df87f027 upd Dockerfile 2024-02-21 11:43:24 +03:00
6e39ce20a6 pre-commit: add check-toml 2024-02-21 11:10:23 +03:00
992197649c add pre-commit 2024-02-21 11:07:23 +03:00
ef4aaf61b8 refactor vscale 2024-02-21 10:40:29 +03:00
c0d1d4b38d use generic provider for ipfy + wtfismyip 2024-02-21 10:02:08 +03:00
38923fec9e fix cli 2024-02-21 10:00:54 +03:00
0844e4935f add generic http source providers 2024-02-21 09:42:38 +03:00
31564c22d5 add ip addrs validation 2024-02-21 09:41:51 +03:00
0509ba9d63 fix vscale response check 2024-02-21 09:12:08 +03:00
3e9142cd54 add wtfismyip source 2024-02-20 23:49:57 +03:00
8a1ba06c88 refactor ipfy source 2024-02-20 23:49:31 +03:00
6949baf12e Dockerfile: use empty settings + state dirs 2024-02-20 21:18:52 +03:00
4439a327ad add Dockerfile 2024-02-20 20:44:27 +03:00
5804bb3d38 fix README 2024-02-20 16:12:13 +03:00
d585f9aab2 fix README 2024-02-20 16:11:32 +03:00
aa88d0e6ab upd README 2024-02-20 16:09:39 +03:00
cb57ed07de refactor vscale output provider
reuse client with headers, base_url
enable http2
2024-02-20 15:27:30 +03:00
952d043fc9 add VscaleDomains output provider 2024-02-20 14:32:43 +03:00
4fc2116dcb refactor base + ignore .env 2024-02-20 14:26:41 +03:00
312875ed2b add README.md 2024-02-20 12:09:43 +03:00
3f0481218b add LICENSE 2024-02-20 12:08:01 +03:00
058b9a9cf8 cli refactor 2024-02-20 12:05:11 +03:00
3d2f046dcf refactor state hash output+filter 2024-02-20 11:11:00 +03:00
b7097540a2 add proxy conf 2024-02-20 10:58:04 +03:00
bbfb4f92af fmt + StateFileFilter 2024-02-20 10:40:23 +03:00
773a4e0107 add filters 2024-02-20 10:15:48 +03:00
33 changed files with 1355 additions and 171 deletions

8
.dockerignore Normal file
View File

@@ -0,0 +1,8 @@
.git/
.venv/
state/*
__pycache__
*.pyo
*.pyc
.env*

View File

@@ -0,0 +1,73 @@
name: Docker Image CI
on:
push:
branches:
- master
tags:
- v*
pull_request:
branches:
- "master"
jobs:
test:
runs-on: cth-ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build image
uses: docker/build-push-action@v4
with:
context: .
push: false
tags: gitea.b4tman.ru/b4tman/pddnsc:test
push:
needs: test
runs-on: cth-ubuntu-latest
if: github.event_name != 'pull_request'
steps:
- uses: actions/checkout@v4
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: |
gitea.b4tman.ru/b4tman/pddnsc
flavor: |
latest=${{ github.ref == 'refs/heads/master' }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to gitea
uses: docker/login-action@v3
with:
registry: gitea.b4tman.ru
username: b4tman
password: ${{ secrets.PKGS_TOKEN }}
- name: Build and push image
uses: docker/build-push-action@v4
with:
context: .
push: true
#platforms: linux/amd64,linux/arm64
platforms: linux/amd64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

8
.gitignore vendored
View File

@@ -1,3 +1,9 @@
.venv/
state/*
__pycache__
__pycache__
*.pyo
*.pyc
.env*
site/
docs/api/
docs/src/

14
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,14 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: check-yaml
- id: check-toml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 24.2.0
hooks:
- id: black

25
Dockerfile Normal file
View File

@@ -0,0 +1,25 @@
FROM python:3.12-alpine
ENV TZ Europe/Moscow
ENV SCHEDULE 1 * * * *
RUN set -x && \
apk add --no-cache --virtual .tz alpine-conf tzdata && \
/sbin/setup-timezone -z $TZ && \
apk del .tz
RUN mkdir /app
WORKDIR /app
COPY pddnsc ./pddnsc
RUN mkdir settings state
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
VOLUME /app/settings
VOLUME /app/state
CMD sh -exc 'echo "\
$SCHEDULE date && cd /app && python -m pddnsc.cli \
" > /etc/crontabs/root && exec crond -l 1 -f'

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Dmitry Belyaev
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

80
README.md Normal file
View File

@@ -0,0 +1,80 @@
# pddnsc
Возможно `DDNS` клиент
## Запуск
~~~sh
python -m pddnsc.cli
~~~
либо в [docker](https://www.docker.com)/[podman](https://podman.io) (для запуска по расписанию в `cron`):
~~~bash
docker run -v .state:/app/state:rw \
-v .settings:/app/settings:ro \
-e SCHEDULE=@hourly \
gitea.b4tman.ru/b4tman/pddnsc
~~~
## Конфигурация
Конфигурация находится в файле `settings/config.toml`.
Пример конфигурации:
~~~toml
[sources]
# сервис https://www.ipify.org
[sources.ipfy]
provider = "IPIFYSource" # имя python класса
[filters]
# этот фильтр означает, что если ip адреса не изменились (по хешу)
# то не нужно никуда отсылать ничего
[filters.state-hash]
provider = "StateHashFilter"
filepath = "state/hash.txt"
[outputs]
# сохранение хеша всех ip адресов в файл,
# чтобы потом проверить изменились ли они
[outputs.hash-file]
provider = "StateHashFile"
filepath = "state/hash.txt"
# сервис доменов на vscale (vds.selectel.ru)
# токен в переменной окружения VSCALE_API_TOKEN
[outputs.vscale]
provider = "VscaleDomains"
domain = "example.com"
target = "www" # изменяем www.example.com
ttl = 3600
ipv4 = true # менять A запись
ipv6 = false # НЕ менять AAAA запись
~~~
Конфигурация состоит из секций:
- `sources` - источники ip адресов (выбирается результат от первого по времени сработавшего источника, который вернул хотябы один ipv4 или ipv6 адрес)
- `filters` - фильтры, если хоть один вернет ложь то программа ничего никуда не запишет и не отправит, например проверка, что ip адрес не изменился
- `outputs` - модули вывода, например вывод в консоль, запись в файл или создание dns записей на сервере
Все модули источников/фильтров/вывода работают конкурентно через [asyncio](https://docs.python.org/3/library/asyncio.html#module-asyncio) и [httpx](https://www.python-httpx.org).
### Подробная документация
Её необходимо собрать с помошью [mkdocs](https://www.mkdocs.org). Для этого нужно установить зависимости:
~~~bash
pip install -r requirements.txt
pip install -r requirements.docs.txt
~~~
После этого либо собрать документацию в каталог `site/`, либо запустить тестовый сервер.
~~~bash
# сборка
mkdocs build
# тестовый сервер
mkdocs serve
~~~

1
docs/index.md Normal file
View File

@@ -0,0 +1 @@
--8<-- "README.md"

17
mkapi_conf.py Normal file
View File

@@ -0,0 +1,17 @@
"""Config functions."""
from __future__ import annotations
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from mkdocs.config.defaults import MkDocsConfig
from mkapi.plugins import MkAPIPlugin
def before_on_config(config: MkDocsConfig, plugin: MkAPIPlugin) -> None: # noqa: ARG001
"""Called before `on_config` event of MkAPI plugin."""
if "." not in sys.path:
sys.path.insert(0, ".")

24
mkdocs.yml Normal file
View File

@@ -0,0 +1,24 @@
site_name: pddnsc
site_description: Документация pddnsc
site_author: b4tman
repo_url: https://gitea.b4tman.ru/b4tman/pddnsc
repo_name: pddnsc
copyright: (c) 2024 Дмитрий Беляев
theme:
name: material
language: ru
locale: ru
highlightjs: true
markdown_extensions:
- pymdownx.snippets
plugins:
- search:
lang: ru
- mkapi:
config: mkapi_conf.py
nav:
- Главная: index.md
- Справочник: $api/pddnsc.***

View File

@@ -0,0 +1 @@
""" Возможно клиент DDNS """

View File

@@ -1,36 +1,88 @@
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import NamedTuple, Optional
from netaddr import valid_ipv4, valid_ipv6
from httpx import AsyncHTTPTransport
class IPAddreses(NamedTuple):
"""Набор из названия источника и IP адресов, результат одного из источников"""
source_name: str
ipv4: str
ipv6: str
class BaseSourceProvider(ABC):
"""Базовый класс для провайдеров источников"""
_childs = {}
registred = {}
def __init__(self, name, config, ipv4t, ipv6t):
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config, self.ipv4t, self.ipv6t = (
name,
config,
ipv4t,
ipv6t,
)
self.post_init()
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
async def fetch_all(self) -> tuple[str, str, str]:
def filter_ipv4(self, value: str) -> str:
"""Функция для проверки валидности IPv4 адреса, возвращает "" если адрес неправильный или пустой"""
return value if value and valid_ipv4(value) else ""
def filter_ipv6(self, value: str) -> str:
"""Функция для проверки валидности IPv6 адреса, возвращает "" если адрес неправильный или пустой"""
return value if value and valid_ipv6(value) else ""
async def fetch_all(self) -> IPAddreses:
"""Метод для получения всех IP адресов сразу"""
results = await asyncio.gather(
self.fetch_v4(), self.fetch_v6(), return_exceptions=True
)
return (self.name,) + tuple(
None if isinstance(i, Exception) else i for i in results
return IPAddreses(
self.name, *("" if isinstance(i, Exception) else i for i in results)
)
async def fetch_v4(self) -> str:
"""Метод внешнего интерфейса для получения IPv4"""
result = await asyncio.gather(self.fetch_v4_impl(), return_exceptions=True)
ipv4 = ""
if not isinstance(result[0], Exception):
ipv4 = result[0]
return self.filter_ipv4(ipv4)
async def fetch_v6(self) -> str:
"""Метод внешнего интерфейса для получения IPv6"""
result = await asyncio.gather(self.fetch_v6_impl(), return_exceptions=True)
ipv6 = ""
if not isinstance(result[0], Exception):
ipv6 = result[0]
return self.filter_ipv6(ipv6)
def __init_subclass__(cls) -> None:
BaseSourceProvider._childs[cls.__name__] = cls
return super().__init_subclass__()
@classmethod
def validate_source_config(cls, name, config):
def validate_source_config(cls, name: str, config: dict):
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
@@ -39,28 +91,51 @@ class BaseSourceProvider(ABC):
return True
@classmethod
def register_provider(cls, name, config, ipv4t, ipv6t):
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
@abstractmethod
async def fetch_v4(self) -> str:
async def fetch_v4_impl(self) -> Optional[str]:
"""Необходимый метод для реализации получения IPv4"""
...
@abstractmethod
async def fetch_v6(self) -> str:
async def fetch_v6_impl(self) -> Optional[str]:
"""Необходимый метод для реализации получения IPv6"""
...
class BaseOutputProvider(ABC):
"""Базовый класс для провайдеров вывода"""
_childs = {}
registred = {}
def __init__(self, name, config, ipv4t, ipv6t):
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init()
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None:
BaseOutputProvider._childs[cls.__name__] = cls
@@ -69,13 +144,16 @@ class BaseOutputProvider(ABC):
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
def best_client(self, addr_v4, addr_v6):
if addr_v6 is None and addr_v4 is not None:
return self.ipv4t
return self.ipv6t
def best_transport(self, addr_v4: str, addr_v6: str) -> AsyncHTTPTransport:
"""Метод выбирает лучший транспорт для отправки адресов (либо ipv4 либо ipv6)"""
if addr_v6:
return self.ipv6t
return self.ipv4t
@classmethod
def validate_source_config(cls, name, config):
def validate_source_config(cls, name: str, config: dict):
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
@@ -84,15 +162,88 @@ class BaseOutputProvider(ABC):
return True
@classmethod
def register_provider(cls, name, config, ipv4t, ipv6t):
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def set_addrs(self, source_provider, addr_v4, addr_v6):
async def set_addrs(self, source_provider: str, addr_v4: str, addr_v6: str):
"""Метод внешнего интерфейса для отправки адресов"""
return await self.set_addrs_imp(source_provider, addr_v4, addr_v6)
@abstractmethod
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
"""Необходимый метод для реализации отправки/вывода IP аддресов"""
...
class BaseFilterProvider(ABC):
"""Базовый класс для провайдеров фильтров"""
_childs = {}
registred = {}
def __init__(
self,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
self.name, self.config = name, config
self.ipv4t, self.ipv6t = ipv4t, ipv6t
self.post_init()
def post_init(self):
"""Метод для переопределения пост инициализации"""
...
def __init_subclass__(cls) -> None:
BaseFilterProvider._childs[cls.__name__] = cls
return super().__init_subclass__()
def __str__(self):
return f"{self.__class__.__name__}: {self.name}"
@classmethod
def validate_source_config(cls, name: str, config: dict) -> bool:
"""Метод валидации конфигурации для провайдера"""
if "provider" not in config:
return False
prov_name = config["provider"]
if prov_name not in cls._childs:
return False
return True
@classmethod
def register_provider(
cls,
name: str,
config: dict,
ipv4t: AsyncHTTPTransport,
ipv6t: AsyncHTTPTransport,
):
"""Метод регистрации провайдера по конфигурации"""
if not cls.validate_source_config(name, config):
return
provider = cls._childs[config["provider"]]
cls.registred[name] = provider(name, config, ipv4t, ipv6t)
async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""Метод внешнего интерфейса для проверки адресов"""
return await self.check_imp(source_provider, addr_v4, addr_v6)
@abstractmethod
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
"""Необходимый метод реализации проверки"""
...

View File

@@ -1,77 +1,23 @@
""" модуль запуска """
import httpx
import asyncio
from abc import ABC, abstractmethod
import toml
from .base import BaseSourceProvider, BaseOutputProvider
from . import sources
from . import outputs
import asyncio
async def source_task(providers):
result = None
is_done = False
pending = [asyncio.create_task(p.fetch_all()) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if len(result) == 3 and result[1] or result[2]:
is_done = True
break
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return result
async def output_task(providers, result):
await asyncio.gather(
*(asyncio.create_task(p.set_addrs(*result)) for p in providers)
)
async def app(ipv4t, ipv6t):
config = toml.load("settings/config.toml")
for source_name in config["sources"]:
BaseSourceProvider.register_provider(
source_name, config["sources"][source_name], ipv4t, ipv6t
)
for output_name in config["outputs"]:
BaseOutputProvider.register_provider(
output_name, config["outputs"][output_name], ipv4t, ipv6t
)
debug = config.get("debug", False)
if debug:
print("DEBUG info:")
print(
f"source classes: {[*BaseSourceProvider._childs]}, {[*map(str, BaseSourceProvider._childs.values())]}"
)
print(
f"output classes: {[*BaseOutputProvider._childs]}, {[*map(str, BaseOutputProvider._childs.values())]}"
)
print(
f"source providers: {[*BaseSourceProvider.registred]}, {[*map(str, BaseSourceProvider.registred.values())]}"
)
print(
f"output providers: {[*BaseOutputProvider.registred]}, {[*map(str, BaseOutputProvider.registred.values())]}"
)
print(config)
result = await source_task(BaseSourceProvider.registred.values())
await output_task(BaseOutputProvider.registred.values(), result)
from pddnsc.core import app
async def main():
"""Точка входа программы
загрузка конфигурации и создание транспортов IPv4 и IPv6
"""
config = toml.load("settings/config.toml")
async with httpx.AsyncHTTPTransport(
local_address="0.0.0.0",
) as ipv4t, httpx.AsyncHTTPTransport(local_address="::") as ipv6t:
await app(ipv4t, ipv6t)
local_address="0.0.0.0", proxy=config.get("proxy_v4")
) as ipv4t, httpx.AsyncHTTPTransport(
local_address="::", proxy=config.get("proxy_v6")
) as ipv6t:
await app(config, ipv4t, ipv6t)
if __name__ == "__main__":

285
pddnsc/core.py Normal file
View File

@@ -0,0 +1,285 @@
import httpx
import asyncio
from .base import BaseFilterProvider, BaseSourceProvider, BaseOutputProvider, IPAddreses
from .plugins import use_plugins
from typing import Optional, NamedTuple
class NeededAddrs(NamedTuple):
ipv4: bool
ipv6: bool
@classmethod
def from_config(cls, config: dict) -> "NeededAddrs":
need_ipv4 = config.get("require_ipv4", False)
need_ipv6 = config.get("require_ipv6", False)
if "require_ipv4" not in config and "require_ipv6" not in config:
need_ipv4 = need_ipv6 = True
return cls(need_ipv4, need_ipv6)
def is_valid_addreses(addrs: IPAddreses, config: dict) -> bool:
"""Проверка валидности IP адресов
Args:
addrs (IPAddreses): IP адреса - результат одного из источников
config (dict): общая конфигурация
Returns:
bool: валиден или нет
"""
result = addrs.ipv4 or addrs.ipv6
if config.get("require_ipv4"):
result = result and addrs.ipv4
if config.get("require_ipv6"):
result = result and addrs.ipv6
return bool(result)
def is_ipv4_ok(ipv4: str, config: dict) -> bool:
"""Проверка IPv4 адреса, подходит или нет
Args:
ipv4 (str): IPv4 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv4) if NeededAddrs.from_config(config).ipv4 else True
def is_ipv6_ok(ipv6: str, config: dict) -> bool:
"""Проверка IPv6 адреса, подходит или нет
Args:
ipv6 (str): IPv6 адрес
config (dict): общая конфигурация
Returns:
bool: подходит или нет
"""
return bool(ipv6) if NeededAddrs.from_config(config).ipv6 else True
async def get_ip_addresses(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
unit_mode = config.get("unit_mode", False)
if unit_mode:
result = await get_ip_addresses_unit(config)
else:
result = await get_ip_addresses_any(config)
return result
async def get_ip_addresses_any(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим any)
Получает разные адреса от любых источников
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
need = NeededAddrs.from_config(config)
providers = BaseSourceProvider.registred.values()
ip_addresses, is_done = None, False
last_src, last_ipv4, last_ipv6 = "", "", ""
ok_ipv4, ok_ipv6 = False, False
drop = []
pending = []
if need.ipv4:
pending += [
asyncio.create_task(p.fetch_v4(), name=f"{p.name}.v4") for p in providers
]
if need.ipv6:
pending += [
asyncio.create_task(p.fetch_v6(), name=f"{p.name}.v6") for p in providers
]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addr = x.result()
if x.get_name().endswith(".v6"):
last_ipv6 = last_ipv6 if ok_ipv6 else ip_addr
else:
last_ipv4 = last_ipv4 if ok_ipv4 else ip_addr
if config.get("debug"):
print("debug:", "get", x.get_name(), ip_addr)
last_src = x.get_name().rsplit(".", maxsplit=1)[0]
ok_ipv4 = is_ipv4_ok(last_ipv4, config)
ok_ipv6 = is_ipv6_ok(last_ipv6, config)
pending_v4 = [*filter(lambda i: i.get_name().endswith(".v4"), pending)]
pending_v6 = [*filter(lambda i: i.get_name().endswith(".v6"), pending)]
if ok_ipv4 and pending_v4:
drop += pending_v4
pending = pending_v6
for i in pending_v4:
i.cancel()
pending_v4 = []
if ok_ipv6 and pending_v6:
drop += pending_v6
pending = pending_v4
for i in pending_v6:
i.cancel()
pending_v6 = []
if not (ok_ipv4 and ok_ipv6):
continue
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if not (ok_ipv4 and ok_ipv6):
ip_addresses = IPAddreses(last_src, last_ipv4, last_ipv6)
drop = drop + list(pending)
if drop:
gather = asyncio.gather(*drop)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def get_ip_addresses_unit(config: dict) -> Optional[IPAddreses]:
"""Получение всех IP адресов из всех источников (режим unit)
Получает адреса только от одного источника
Args:
config (dict): общая конфигурация
Returns:
Optional[IPAddreses]: результат получения, либо None
"""
providers = BaseSourceProvider.registred.values()
ip_addresses = None
is_done = False
pending = [asyncio.create_task(p.fetch_all(), name=p.name) for p in providers]
while not is_done and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
ip_addresses = x.result()
if is_valid_addreses(ip_addresses, config):
is_done = True
break
ip_addresses = None
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return ip_addresses
async def check_ip_addresses(ip_addresses: IPAddreses) -> bool:
"""Проверка результата получения IP адресов с помощью фильтров
Args:
ip_addresses (IPAddreses): IP адреса
Returns:
bool: корректны ли адреса (изменились ли они),
надо ли продолжать обработку и отправлять их на сервер
"""
providers = BaseFilterProvider.registred.values()
result = True
failed = ""
pending = [
asyncio.create_task(p.check(*ip_addresses), name=p.name) for p in providers
]
while result and pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for x in done:
result = x.result()
if not result:
failed = x.get_name()
if pending:
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
if not result:
print("failed filter:", failed)
return result
async def send_ip_addreses(ip_addresses: IPAddreses):
"""Отправка адресов на все плагины вывода
Args:
ip_addresses (IPAddreses): IP адреса
"""
providers = BaseOutputProvider.registred.values()
await asyncio.gather(
*(
asyncio.create_task(p.set_addrs(*ip_addresses), name=p.name)
for p in providers
)
)
def print_debug_info(config: dict):
"""Вывод всех зарегистрированных плагинов и другой отладочной информации"""
def format_registred(name, base):
result = f"{name}:\n"
result += f" classes: {[*base._childs]}\n"
result += f" .values: {[*map(str, base._childs.values())]}\n"
result += f" providers: {[*base.registred]}\n"
result += f" .values: {[*map(str, base.registred.values())]}\n"
return result
if config.get("debug", False):
print("DEBUG ->")
bases = BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
for info in map(format_registred, "sources filters outputs".split(), bases):
print(info)
print("DEBUG <-")
async def app(
config: dict, ipv4t: httpx.AsyncHTTPTransport, ipv6t: httpx.AsyncHTTPTransport
):
"""Запуск приложения
Args:
config (dict): общая конфигурация
ipv4t (httpx.AsyncHTTPTransport): транспорт IPv4
ipv6t (httpx.AsyncHTTPTransport): транспорт IPv6
"""
use_plugins(config, ipv4t, ipv6t)
print_debug_info(config)
ip_addreses = await get_ip_addresses(config)
if ip_addreses is None:
print("no IP addresses")
return
if not await check_ip_addresses(ip_addreses):
print("stop by filters")
return
await send_ip_addreses(ip_addreses)
print("done")

View File

@@ -0,0 +1,5 @@
""" Фильтры для проверки адресов перед отправкой """
from pddnsc.loaders import load_plugins
load_plugins(__file__)

84
pddnsc/filters/files.py Normal file
View File

@@ -0,0 +1,84 @@
import hashlib
import json
import aiofiles
from os.path import isfile
from pddnsc.base import BaseFilterProvider
class GenericTextFileFilter(BaseFilterProvider):
"""Проверка на то что хотябы один IP адрес изменился по сравнению с текстом в файле
Конфигурация:
- filepath: имя файла
- encoding: кодировка, по умолчанию "utf-8"
- mode: режим открытия, по умолчанию "r"
- check_ipv4: проверять ли IPv4, по умолчанию нет если check_ipv6 есть в конфиге иначе да
- check_ipv6: проверять ли IPv6, по умолчанию нет если check_ipv4 есть в конфиге иначе да
"""
def post_init(self):
super().post_init()
self.filepath = self.config["filepath"]
self.encoding = self.config.get("encoding", "utf-8")
self.mode = self.config.get("mode", "r")
self.check_ipv4 = self.config.get("check_ipv4", False)
self.check_ipv6 = self.config.get("check_ipv6", False)
if "check_ipv4" not in self.config and "check_ipv4" not in self.config:
self.check_ipv4 = self.check_ipv6 = True
self.content = ""
async def read(self) -> str:
async with aiofiles.open(
self.filepath, mode=self.mode, encoding=self.encoding
) as f:
self.content = await f.read()
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
lst = []
if self.check_ipv4:
lst.append(addr_v4)
if self.check_ipv4:
lst.append(addr_v6)
new_content = "\n".join(lst)
return new_content != self.content
async def check(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
if not isfile(self.filepath):
return True
await self.read()
return await self.check_imp(source_provider, addr_v4, addr_v6)
class StateHashFilter(GenericTextFileFilter):
"""Проверка на то что хотябы один IP адрес изменился по хешу сохраненному в файле"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
new_state_str = (self.check_ipv4 and addr_v4 or "") + (
self.check_ipv6 and addr_v6 or ""
)
new_sha = hashlib.sha256(new_state_str.encode(encoding=self.encoding))
return self.content != new_sha.hexdigest()
class StateFileFilter(GenericTextFileFilter):
"""Проверка на то что хотябы один IP адрес изменился по сравнению с данными в json файле"""
async def check_imp(self, source_provider: str, addr_v4: str, addr_v6: str) -> bool:
new_state = {}
if self.check_ipv4:
new_state["ipv4"] = addr_v4 or ""
if self.check_ipv6:
new_state["ipv6"] = addr_v6 or ""
old_state = json.loads(self.content)
result = True
if self.check_ipv4:
result = result and new_state["ipv4"] != old_state["ipv4"]
if self.check_ipv6:
result = result and new_state["ipv6"] != old_state["ipv6"]
return result

37
pddnsc/loaders.py Normal file
View File

@@ -0,0 +1,37 @@
""" функции загрузки файлов плагинов """
import os
import traceback
from importlib import util
def load_module(path: str):
"""загрузка python модуля
Args:
path (str): имя файла
"""
name = os.path.split(path)[-1]
spec = util.spec_from_file_location(name, path)
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def load_plugins(init_filepath: str):
"""Загрузка плагинов из пакета
Args:
init_filepath (str): имя `__init__.py` файла пакета
"""
dirpath = os.path.dirname(os.path.abspath(init_filepath))
for fname in os.listdir(dirpath):
if (
not fname.startswith(".")
and not fname.startswith("__")
and fname.endswith(".py")
):
try:
load_module(os.path.join(dirpath, fname))
except Exception:
traceback.print_exc()

View File

@@ -1,3 +1,5 @@
from pddnsc.plugins import load_plugins
""" Модули вывода """
load_plugins(__file__)
from pddnsc.loaders import load_plugins
load_plugins(__file__)

View File

@@ -2,8 +2,11 @@ import asyncio
from pddnsc.base import BaseOutputProvider
class JustPrint(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
"""Вывод IP адресов в консоль"""
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
print(f">> {self.name}")
print(f"addresses from: {source_provider}")
print(f"IPv4: {addr_v4}")

View File

@@ -1,4 +1,3 @@
import asyncio
import aiofiles
import json
import hashlib
@@ -6,24 +5,70 @@ import hashlib
from pddnsc.base import BaseOutputProvider
class StateFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
state = {
"ipv4": addr_v4 or "",
"ipv6": addr_v6 or "",
}
state_str = json.dumps(state)
async with aiofiles.open(self.config['filepath'], mode='w', encoding='utf-8') as f:
await f.write(state_str)
class GenericTextFile(BaseOutputProvider):
"""Сохранение IP адресов в текстовый файл
Конфигурация:
- filepath: имя файла
- encoding: кодировка, по умолчанию "utf-8"
- mode: режим открытия, по умолчанию "w"
- save_ipv4: сохранять ли IPv4, по умолчанию нет если save_ipv6 есть в конфиге иначе да
- save_ipv6: сохранять ли IPv6, по умолчанию нет если save_ipv4 есть в конфиге иначе да
"""
def post_init(self):
super().post_init()
self.filepath = self.config["filepath"]
self.encoding = self.config.get("encoding", "utf-8")
self.mode = self.config.get("mode", "w")
self.save_ipv4 = self.config.get("save_ipv4", False)
self.save_ipv6 = self.config.get("save_ipv6", False)
if "save_ipv4" not in self.config and "save_ipv4" not in self.config:
self.save_ipv4 = self.save_ipv6 = True
self.content = ""
async def read(self):
async with aiofiles.open(self.filepath, mode="r", encoding=self.encoding) as f:
self.content = await f.read()
def set_content(self, ipv4: str, ipv6: str):
lst = []
if self.save_ipv4:
lst.append(ipv4)
if self.save_ipv6:
lst.append(ipv6)
self.content = "\n".join(lst)
async def write(self):
async with aiofiles.open(
self.filepath, mode=self.mode, encoding=self.encoding
) as f:
await f.write(self.content)
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
await self.set_content(addr_v4, addr_v6)
await self.write()
class StateHashFile(BaseOutputProvider):
async def set_addrs_imp(self, source_provider, addr_v4, addr_v6):
state = {
"ipv4": addr_v4 or "",
"ipv6": addr_v6 or "",
}
state_str = json.dumps(state)
sha = hashlib.sha256(state_str.encode(encoding='utf-8'))
async with aiofiles.open(self.config['filepath'], mode='w', encoding='utf-8') as f:
await f.write(sha.hexdigest())
class StateFile(GenericTextFile):
"""Сохранение всех IP адресов в json файл"""
async def set_content(self, addr_v4: str, addr_v6: str):
state = {}
if self.save_ipv4:
state["ipv4"] = addr_v4 or ""
if self.save_ipv6:
state["ipv6"] = addr_v6 or ""
self.content = json.dumps(state)
class StateHashFile(GenericTextFile):
"""Сохранение хеша от всех IP адресов в файл"""
async def set_content(self, addr_v4: str, addr_v6: str):
state_str = (self.save_ipv4 and addr_v4 or "") + (
self.save_ipv6 and addr_v6 or ""
)
sha = hashlib.sha256(state_str.encode(encoding=self.encoding))
self.content = sha.hexdigest()

164
pddnsc/outputs/vscale.py Normal file
View File

@@ -0,0 +1,164 @@
import os
import httpx
from typing import Optional
from pddnsc.base import BaseOutputProvider
class VscaleDomains(BaseOutputProvider):
"""Домены на vds.selectel.ru (ранее vscale.io)
Конфигурация:
- api_token_env: имя переменной окружения, в которой находится токен доступа, по умолчанию `VSCALE_API_TOKEN`
- api_base: базовый адрес API, по умолчанию `https://api.vscale.io/v1/`
- domain: имя основного домена, например `example.com`
- target: имя изменяемой записи, например `www` (чтобы изменить `www.example.com`)
- ttl: время жизни записи, по умолчанию 3600
- save_ipv4: сохранять ли ipv4 адрес (A запись)
- save_ipv6: сохранять ли ipv6 адрес (AAAA запись)
(если нет ни одного из [save_ipv4, save_ipv6] то сохраняются оба адреса)
"""
def post_init(self):
token = self.get_api_token()
if not token:
raise KeyError("no api token, use env VSCALE_API_TOKEN")
self.__headers = {"X-Token": token}
self.api_base = self.config.get("api_base", "https://api.vscale.io/v1/")
self.ttl = self.config.get("ttl", 3600)
self.domain = self.config["domain"]
target = self.config["target"]
self.target = f"{target}.{self.domain}"
self.save_ipv4 = self.config.get("ipv4", False)
self.save_ipv6 = self.config.get("ipv6", False)
if "ipv4" not in self.config and "ipv6" not in self.config:
self.save_ipv4 = self.save_ipv6 = True
def get_api_token(self) -> str:
"""получение API токена"""
token_env = self.config.get("api_token_env", "VSCALE_API_TOKEN")
return os.environ[token_env]
async def find_domain_id(self, client: httpx.AsyncClient) -> Optional[int]:
"""поиск ID домена"""
response = await client.get("/domains/")
if response.is_success:
data = response.json()
if isinstance(data, list):
for entry in data:
if entry["name"] == self.domain:
return entry["id"]
else:
raise TypeError("failed to find domain id, unexpected response type")
else:
raise ValueError(f"failed to find domain id, code: {response.status_code}")
async def find_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str
) -> Optional[int]:
"""поиск ID записи"""
response = await client.get(
f"/domains/{domain_id}/records/",
)
if response.is_success:
data = response.json()
if isinstance(data, list):
for entry in data:
if entry["name"] == self.target and entry["type"] == record_type:
return entry["id"]
else:
raise RuntimeError(
f"error list records {domain_id=}: ", response.status_code
)
async def get_record_value(
self, client: httpx.AsyncClient, domain_id: int, record_id: int
) -> str:
"""получение действующего значения записи"""
response = await client.get(f"/domains/{domain_id}/records/{record_id}")
if response.is_success:
data = response.json()
if isinstance(data, dict):
return data["content"]
async def change_record(
self,
client: httpx.AsyncClient,
domain_id: int,
record_id: int,
record_type: str,
value: str,
):
"""изменение записи"""
data = {
"content": value,
"name": self.target,
"ttl": self.ttl,
"type": record_type,
"id": record_id,
}
response = await client.put(
f"/domains/{domain_id}/records/{record_id}",
json=data,
)
if not response.is_success:
raise RuntimeError(
f"failed to change record: {self.target=},{domain_id=}, {record_id=}, {record_type=}, {value=}"
)
async def create_record(
self, client: httpx.AsyncClient, domain_id: int, record_type: str, value: str
):
"""создание новой записи"""
data = {
"content": value,
"name": self.target,
"ttl": self.ttl,
"type": record_type,
}
response = await client.post(
f"/domains/{domain_id}/records/",
json=data,
)
if not response.is_success:
raise RuntimeError(
f"failed to create record: {self.target=},{domain_id=}, {record_type=}, {value=}, {response.status_code=}"
)
async def set_addrs_imp(self, source_provider: str, addr_v4: str, addr_v6: str):
save_addrs = []
if addr_v4 and self.save_ipv4:
save_addrs.append(("A", addr_v4))
if addr_v6 and self.save_ipv6:
save_addrs.append(("AAAA", addr_v6))
transport = self.best_transport(addr_v4, addr_v6)
async with httpx.AsyncClient(
transport=transport,
base_url=self.api_base,
headers=self.__headers,
http2=True,
) as client:
domain_id = await self.find_domain_id(client)
for record_type, value in save_addrs:
record_id = await self.find_record(client, domain_id, record_type)
if record_id:
old_value = await self.get_record_value(
client, domain_id, record_id
)
if old_value != value:
await self.change_record(
client, domain_id, record_id, record_type, value
)
else:
print(
f"vscale: skip record change ({record_type=}), value equal"
)
else:
await self.create_record(client, domain_id, record_type, value)

View File

@@ -1,22 +1,30 @@
import os
import traceback
from importlib import util
""" модуль взаимодействия и регистрации плагинов """
from httpx import AsyncHTTPTransport
from .base import BaseSourceProvider, BaseFilterProvider, BaseOutputProvider
from . import sources
from . import filters
from . import outputs
def load_module(path):
name = os.path.split(path)[-1]
spec = util.spec_from_file_location(name, path)
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def unused():
"""Чтобы убрать предупреждение о неиспользуемых импортах"""
return sources, filters, outputs
def load_plugins(init_filepath):
dirpath = os.path.dirname(os.path.abspath(init_filepath))
for fname in os.listdir(dirpath):
if not fname.startswith('.') and \
not fname.startswith('__') and fname.endswith('.py'):
try:
load_module(os.path.join(dirpath, fname))
except Exception:
traceback.print_exc()
def use_plugins(config: dict, ipv4t: AsyncHTTPTransport, ipv6t: AsyncHTTPTransport):
"""Регистрация всех плагинов указаных в конфигурации"""
for source_name in config.get("sources", []):
BaseSourceProvider.register_provider(
source_name, config["sources"][source_name], ipv4t, ipv6t
)
for filter_name in config.get("filters", []):
BaseFilterProvider.register_provider(
filter_name, config["filters"][filter_name], ipv4t, ipv6t
)
for output_name in config.get("outputs", []):
BaseOutputProvider.register_provider(
output_name, config["outputs"][output_name], ipv4t, ipv6t
)

View File

@@ -1,3 +1,5 @@
from pddnsc.plugins import load_plugins
""" Модули источников IP адресов """
from pddnsc.loaders import load_plugins
load_plugins(__file__)

View File

@@ -1,31 +1,39 @@
import httpx
""" модуль имитации получения IP аддресов """
import asyncio
from pddnsc.base import BaseSourceProvider
class DummySource(BaseSourceProvider):
async def fetch_v4(self) -> str:
async with httpx.AsyncClient(transport=self.ipv4t) as client:
result = await asyncio.sleep(10, result=None)
result = await asyncio.sleep(10, result=None)
"""Имитация получения пустых адресов"""
async def fetch_v4_impl(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result
async def fetch_v6(self) -> str:
async with httpx.AsyncClient(transport=self.ipv6t) as client:
result = await asyncio.sleep(10, result=None)
result = await asyncio.sleep(10, result=None)
async def fetch_v6_impl(self) -> str:
result = await asyncio.sleep(self.config.get("delay", 1), result="")
return result
class FakeSource(BaseSourceProvider):
async def fetch_v4(self) -> str:
async with httpx.AsyncClient(transport=self.ipv4t) as client:
result = await asyncio.sleep(
3.3, result=self.config.get("ipv4", "127.0.0.1")
)
"""Имитация получения заданных в конфигурации адресов
Конфигурация:
- ipv4: строка IPv4, по умолчанию "127.0.0.1"
- ipv6: строка IPv6, по умолчанию "::1"
"""
async def fetch_v4_impl(self) -> str:
result = await asyncio.sleep(
self.config.get("delay", 1), result=self.config.get("ipv4", "127.0.0.1")
)
return result
async def fetch_v6(self) -> str:
async with httpx.AsyncClient(transport=self.ipv6t) as client:
result = await asyncio.sleep(4.4, result=self.config.get("ipv6", "::1"))
async def fetch_v6_impl(self) -> str:
result = await asyncio.sleep(
self.config.get("delay", 1), result=self.config.get("ipv6", "::1")
)
return result

96
pddnsc/sources/http.py Normal file
View File

@@ -0,0 +1,96 @@
from typing import Optional
import httpx
from pddnsc.base import BaseSourceProvider
class GenericHttpSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде текста.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6")
self.headers = self.config.get("headers", {})
async def fetch_v4_impl(self) -> Optional[str]:
if not self.url_v4:
return None
result = ""
async with httpx.AsyncClient(
transport=self.ipv4t, headers=self.headers
) as client:
response = await client.get(self.url_v4)
if response.is_success:
result = response.text.strip()
return result
async def fetch_v6_impl(self) -> Optional[str]:
if not self.url_v6:
return None
result = ""
async with httpx.AsyncClient(
transport=self.ipv6t, headers=self.headers
) as client:
response = await client.get(self.url_v6)
if response.is_success:
result = response.text.strip()
return result
class GenericHttpJsonSource(BaseSourceProvider):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде json.
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- key_v4: ключ *json* (`int` для списка, `str` для словаря) для *IPv4*
- key_v6: ключ *json* (`int` для списка, `str` для словаря) для *IPv6*
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
self.url_v4 = self.config.get("url_v4")
self.url_v6 = self.config.get("url_v6")
self.key_v4 = self.config.get("key_v4")
self.key_v6 = self.config.get("key_v6")
if self.config.get("use_accept", True):
headers = {"Accept": self.config.get("acept_type", "application/json")}
else:
headers = {}
self.headers = headers.update(self.config.get("headers", {}))
async def fetch_v4_impl(self) -> Optional[str]:
if not self.url_v4 or self.key_v4 is None:
return None
result = ""
async with httpx.AsyncClient(
transport=self.ipv4t, headers=self.headers
) as client:
response = await client.get(self.url_v4)
if response.is_success:
result = response.json()[self.key_v4].strip()
return result
async def fetch_v6_impl(self) -> Optional[str]:
if not self.url_v6 or self.key_v6 is None:
return None
result = ""
async with httpx.AsyncClient(
transport=self.ipv6t, headers=self.headers
) as client:
response = await client.get(self.url_v6)
if response.is_success:
result = response.json()[self.key_v6].strip()
return result

View File

@@ -0,0 +1,33 @@
import re
from pddnsc.sources.http import GenericHttpSource
class GenericHttpRegexSource(GenericHttpSource):
"""Базовый провайдер получения IP адресов по http/https ссылкам в виде текста c помощью регулярных выражений
Конфигурация:
- url_v4: *URL* для получения адреса *IPv4*
- url_v6: *URL* для получения адреса *IPv6*
- regex_v4: регулярное выражение для *IPv4*, значение по умолчанию см. в коде
- regex_v6: регулярное выражение для *IPv6*, значение по умолчанию см. в коде
- headers (`dict`): словарь дополнительных заголовков (*необязательно*)
"""
def post_init(self):
super().post_init()
rx_v4 = r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
rx_v6 = r"\b(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\b"
self.regex_v4 = re.compile(self.config.get("regex_v4", rx_v4))
self.regex_v6 = re.compile(self.config.get("regex_v6", rx_v6))
def filter_ipv4(self, ipv4):
match = self.regex_v4.search(ipv4 or "")
ipv4 = match and match[0]
return super().filter_ipv4(ipv4)
def filter_ipv6(self, ipv6):
match = self.regex_v6.search(ipv6 or "")
ipv6 = match and match[0]
return super().filter_ipv6(ipv6)

View File

@@ -1,20 +1,10 @@
import httpx
from pddnsc.sources.http import GenericHttpSource
from pddnsc.base import BaseSourceProvider
class IPIFYSource(BaseSourceProvider):
async def fetch_v4(self) -> str:
async with httpx.AsyncClient(transport=self.ipv4t) as client:
response = await client.get("https://api4.ipify.org/?format=json")
if response.status_code == httpx.codes.OK:
data = response.json()
result = None if not isinstance(data, dict) else data.get("ip")
return result
class IPIFYSource(GenericHttpSource):
"""https://www.ipify.org/"""
async def fetch_v6(self) -> str:
async with httpx.AsyncClient(transport=self.ipv6t) as client:
response = await client.get("https://api6.ipify.org/?format=json")
if response.status_code == httpx.codes.OK:
data = response.json()
result = None if not isinstance(data, dict) else data.get("ip")
return result
def post_init(self):
super().post_init()
self.url_v4 = "https://api4.ipify.org"
self.url_v6 = "https://api6.ipify.org"

10
pddnsc/sources/ipsb.py Normal file
View File

@@ -0,0 +1,10 @@
from pddnsc.sources.http import GenericHttpSource
class IPSB(GenericHttpSource):
"""https://ip.sb/api"""
def post_init(self):
super().post_init()
self.url_v4 = "https://api-ipv4.ip.sb/ip"
self.url_v6 = "https://api-ipv6.ip.sb/ip"

View File

@@ -0,0 +1,13 @@
from pddnsc.sources.http import GenericHttpSource
class WTFIsMyIP(GenericHttpSource):
"""
https://wtfismyip.com/
https://gitlab.com/wtfismyip/wtfismyip
"""
def post_init(self):
super().post_init()
self.url_v4 = "https://text.ipv4.myip.wtf"
self.url_v6 = "https://text.ipv6.myip.wtf"

2
requirements.dev.txt Normal file
View File

@@ -0,0 +1,2 @@
black>=24,<25
pre-commit>=3.6.2,<4

4
requirements.docs.txt Normal file
View File

@@ -0,0 +1,4 @@
mkdocs>=1.5,<2
pymdown-extensions>=10.7,<11
mkapi>=2.1,<3
mkdocs-material>=9.5.10,<10

View File

@@ -2,3 +2,4 @@ httpx[http2]>=0.26,<1.0
asyncio>=3.4.3,<4
aiofiles>=23,<24
toml>=0.10,<1
netaddr>=1,<2

View File

@@ -1,18 +1,43 @@
debug = true
debug = false
require_ipv4 = true
[sources]
[sources.test1-src]
[sources.ipfy]
provider = "IPIFYSource"
[sources.test2-src]
provider = "FakeSource"
ipv6 = "fe80::1"
[sources.wtf]
provider = "WTFIsMyIP"
[sources.ipsb]
provider = "IPSB"
[sources.checkip_dyndns]
provider = "GenericHttpRegexSource"
url_v4 = "http://checkip.dyndns.org/"
#regex_v4 = "\\b\\d{1,3}(\\.\\d{1,3}){3}\\b"
[sources.ifconfig]
provider = "GenericHttpSource"
url_v4 = "https://ifconfig.me/ip"
url_v6 = "https://ifconfig.me/ip"
[sources.ipconfig]
provider = "GenericHttpJsonSource"
url_v4 = "https://ipconfig.io/json"
url_v6 = "https://ipconfig.io/json"
key_v4 = "ip"
key_v6 = "ip"
[filters]
[filters.state-file]
provider = "StateFileFilter"
filepath = "state/state.json"
check_ipv4 = true
#[filters.state-hash]
# provider = "StateHashFilter"
# filepath = "state/hash.txt"
[outputs]
[outputs.print]
provider = "JustPrint"
[outputs.file]
[outputs.state-file]
provider = "StateFile"
filepath = "state/state.json"
[outputs.hash-file]
provider = "StateHashFile"
filepath = "state/hash.txt"
#[outputs.hash-file]
# provider = "StateHashFile"
# filepath = "state/hash.txt"