Compare commits
10 Commits
1f8a81e731
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
8eb54a2515
|
|||
|
8f7f7b88ac
|
|||
|
a348550630
|
|||
|
28748b547e
|
|||
|
aa417f6ed1
|
|||
|
c4c2553c53
|
|||
|
12e7d9ff24
|
|||
|
2687ae5457
|
|||
|
2a51098daf
|
|||
|
03e4e670a9
|
1
.dockerignore
Normal file
1
.dockerignore
Normal file
@@ -0,0 +1 @@
|
||||
/target
|
||||
78
.gitea/workflows/docker.yml
Normal file
78
.gitea/workflows/docker.yml
Normal file
@@ -0,0 +1,78 @@
|
||||
name: Docker Image CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
tags:
|
||||
- v*
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
packages: write
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: cth-ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Build image
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
tags: gitea.b4tman.ru/${{ gitea.repository }}:test
|
||||
push:
|
||||
needs: test
|
||||
runs-on: cth-ubuntu-latest
|
||||
if: github.event_name != 'pull_request'
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Docker meta
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: |
|
||||
gitea.b4tman.ru/${{ gitea.repository }}
|
||||
flavor: |
|
||||
latest=${{ github.ref == 'refs/heads/master' }}
|
||||
tags: |
|
||||
type=ref,event=branch
|
||||
type=ref,event=pr
|
||||
type=semver,pattern={{version}}
|
||||
type=semver,pattern={{major}}.{{minor}}
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to gitea
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: gitea.b4tman.ru
|
||||
username: ${{ gitea.repository_owner }}
|
||||
password: ${{ secrets.PKGS_TOKEN }}
|
||||
|
||||
- name: Build and push image
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
#platforms: linux/amd64,linux/arm64
|
||||
platforms: linux/amd64
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
89
Cargo.lock
generated
89
Cargo.lock
generated
@@ -207,6 +207,12 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "cfg_aliases"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.38"
|
||||
@@ -282,6 +288,16 @@ version = "0.8.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||
|
||||
[[package]]
|
||||
name = "ctrlc"
|
||||
version = "3.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3"
|
||||
dependencies = [
|
||||
"nix",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.3.11"
|
||||
@@ -300,6 +316,7 @@ dependencies = [
|
||||
"bollard",
|
||||
"clap",
|
||||
"clap_derive",
|
||||
"ctrlc",
|
||||
"flexi_logger",
|
||||
"futures-util",
|
||||
"log",
|
||||
@@ -307,7 +324,6 @@ dependencies = [
|
||||
"parse_duration",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tokio-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -680,16 +696,6 @@ version = "0.2.158"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.22"
|
||||
@@ -732,6 +738,18 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.29.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.1"
|
||||
@@ -844,29 +862,6 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"smallvec",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parse_duration"
|
||||
version = "2.1.1"
|
||||
@@ -940,15 +935,6 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.6"
|
||||
@@ -1068,12 +1054,6 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "2.11.1"
|
||||
@@ -1175,15 +1155,6 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.9"
|
||||
@@ -1314,9 +1285,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.52.0",
|
||||
|
||||
10
Cargo.toml
10
Cargo.toml
@@ -11,11 +11,17 @@ anyhow = "1.0.88"
|
||||
bollard = { version = "0.17.1", features = ["ssl", "json_data_content", "tokio-stream"] }
|
||||
clap = { version = "4.5.17", features = ["derive"] }
|
||||
clap_derive = "4.5.13"
|
||||
ctrlc = "3.4.5"
|
||||
flexi_logger = "0.29.0"
|
||||
futures-util = "0.3.30"
|
||||
log = "0.4.22"
|
||||
log-panics = "2.1.0"
|
||||
parse_duration = "2.1.1"
|
||||
serde = "1.0.210"
|
||||
tokio = {version = "1.40.0", features = ["full"]}
|
||||
tokio-macros = "2.4.0"
|
||||
tokio = {version = "1.40.0", features = ["rt", "macros", "time", "sync"]}
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3
|
||||
debug = false
|
||||
lto = true
|
||||
strip = true
|
||||
|
||||
@@ -7,11 +7,13 @@ RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM chef AS builder
|
||||
COPY --from=planner /app/recipe.json recipe.json
|
||||
RUN cargo chef cook --recipe-path recipe.json
|
||||
RUN cargo chef cook --release --recipe-path recipe.json
|
||||
COPY . .
|
||||
RUN cargo build
|
||||
RUN cargo build --release
|
||||
|
||||
FROM debian:12-slim AS runtime
|
||||
WORKDIR /app
|
||||
COPY --from=builder /app/target/debug/doctor-restart /usr/local/bin
|
||||
COPY --from=builder /app/target/release/doctor-restart /usr/local/bin
|
||||
STOPSIGNAL SIGINT
|
||||
ENTRYPOINT ["/usr/local/bin/doctor-restart"]
|
||||
CMD ["-l", "auto-restart.unhealthy", "-i", "10s", "-u", "35s"]
|
||||
|
||||
49
src/docker.rs
Normal file
49
src/docker.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use anyhow::{Context, Result};
|
||||
use bollard::container::ListContainersOptions;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub type Docker = bollard::Docker;
|
||||
pub type Containers = Vec<String>;
|
||||
|
||||
pub fn get_query_options(label: String) -> ListContainersOptions<String> {
|
||||
let mut filters = HashMap::new();
|
||||
//filters.insert("status", vec!["running"]);
|
||||
filters.insert("label".into(), vec![label]);
|
||||
filters.insert("health".into(), vec!["unhealthy".into()]);
|
||||
|
||||
ListContainersOptions {
|
||||
filters,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query_containers<T>(
|
||||
connection: &Docker,
|
||||
query_options: ListContainersOptions<T>,
|
||||
) -> anyhow::Result<Vec<String>>
|
||||
where
|
||||
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
|
||||
std::string::String: From<T>,
|
||||
{
|
||||
Ok(connection
|
||||
.list_containers(Some(query_options))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|container| container.id.unwrap())
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
|
||||
connection.restart_container(container_name, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_docker_connection() -> Result<Arc<Docker>> {
|
||||
let connection = Arc::new(Docker::connect_with_defaults()?);
|
||||
let _ = connection
|
||||
.as_ref()
|
||||
.ping()
|
||||
.await
|
||||
.context("ping on docker connection")?;
|
||||
Ok(connection)
|
||||
}
|
||||
39
src/logger.rs
Normal file
39
src/logger.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use anyhow::Context;
|
||||
use clap::ValueEnum;
|
||||
use flexi_logger::{AdaptiveFormat, Logger, LoggerHandle};
|
||||
|
||||
#[derive(Debug, Clone, ValueEnum)]
|
||||
pub enum LogLevel {
|
||||
Error,
|
||||
Warn,
|
||||
Info,
|
||||
Debug,
|
||||
Trace,
|
||||
}
|
||||
|
||||
impl AsRef<str> for LogLevel {
|
||||
fn as_ref(&self) -> &'static str {
|
||||
match &self {
|
||||
LogLevel::Error => "error",
|
||||
LogLevel::Warn => "warn",
|
||||
LogLevel::Info => "info",
|
||||
LogLevel::Debug => "debug",
|
||||
LogLevel::Trace => "trace",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_logger<T>(log_level: T) -> anyhow::Result<LoggerHandle>
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
let logger = Logger::try_with_str(log_level)
|
||||
.context("default logging level invalid")?
|
||||
.format(flexi_logger::detailed_format)
|
||||
.adaptive_format_for_stdout(AdaptiveFormat::Detailed)
|
||||
.log_to_stdout()
|
||||
.start()
|
||||
.context("can't start logger");
|
||||
log_panics::init();
|
||||
logger
|
||||
}
|
||||
158
src/main.rs
158
src/main.rs
@@ -1,23 +1,23 @@
|
||||
use anyhow::Context;
|
||||
use bollard::container::ListContainersOptions;
|
||||
use bollard::Docker;
|
||||
|
||||
use std::cmp::min;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::default::Default;
|
||||
|
||||
use tokio::time::{timeout, Instant};
|
||||
|
||||
use flexi_logger::{AdaptiveFormat, Logger, LoggerHandle};
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use parse_duration::parse as parse_duration;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::{
|
||||
sync::mpsc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
mod logger;
|
||||
use logger::{create_logger, LogLevel};
|
||||
|
||||
mod docker;
|
||||
use docker::{
|
||||
get_docker_connection, get_query_options, query_containers, restart_container, Containers,
|
||||
Docker,
|
||||
};
|
||||
|
||||
mod scheduler;
|
||||
use scheduler::{Scheduler, SchedulerStopHandle, SchedulerWatcher};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
@@ -34,75 +34,26 @@ struct Cli {
|
||||
/// unhealthy status timeout
|
||||
#[arg(short, long, default_value = "35s", value_parser = parse_duration)]
|
||||
pub unhealthy_timeout: Duration,
|
||||
|
||||
/// log level
|
||||
#[arg(long, value_enum, default_value_t=LogLevel::Info)]
|
||||
pub log_level: LogLevel,
|
||||
}
|
||||
|
||||
fn get_query_options(args: &Cli) -> ListContainersOptions<String> {
|
||||
let mut filters = HashMap::new();
|
||||
//filters.insert("status", vec!["running"]);
|
||||
filters.insert("label".into(), vec![args.label.clone()]);
|
||||
filters.insert("health".into(), vec!["unhealthy".into()]);
|
||||
|
||||
ListContainersOptions {
|
||||
filters,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn create_logger() -> anyhow::Result<LoggerHandle> {
|
||||
let logger = Logger::try_with_str("info")
|
||||
.context("default logging level invalid")?
|
||||
.format(flexi_logger::detailed_format)
|
||||
.adaptive_format_for_stdout(AdaptiveFormat::Detailed)
|
||||
.log_to_stdout()
|
||||
.start()
|
||||
.context("can't start logger");
|
||||
log_panics::init();
|
||||
logger
|
||||
}
|
||||
|
||||
async fn query_containers<T>(
|
||||
connection: &Docker,
|
||||
query_options: ListContainersOptions<T>,
|
||||
) -> anyhow::Result<Vec<String>>
|
||||
where
|
||||
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
|
||||
std::string::String: From<T>,
|
||||
{
|
||||
Ok(connection
|
||||
.list_containers(Some(query_options))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|container| container.id.unwrap())
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
|
||||
connection.restart_container(container_name, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
type Containers = Vec<String>;
|
||||
|
||||
async fn query_task(
|
||||
connection: Arc<Docker>,
|
||||
interval: Duration,
|
||||
tx: mpsc::Sender<Containers>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
args: Cli,
|
||||
mut scheduler: SchedulerWatcher,
|
||||
label: String,
|
||||
) {
|
||||
let query_options = get_query_options(&args);
|
||||
let mut query_time = Duration::new(0, 0);
|
||||
log::debug!("query_task -> start recv");
|
||||
while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() {
|
||||
let start = Instant::now();
|
||||
let query_options = get_query_options(label);
|
||||
|
||||
log::debug!("query_task -> start recv");
|
||||
while scheduler.wait().await.is_some() {
|
||||
let containers = query_containers(&connection, query_options.clone())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let end = Instant::now();
|
||||
query_time = min(end - start, interval - Duration::from_millis(1));
|
||||
|
||||
let res = tx.send(containers).await;
|
||||
if res.is_err() {
|
||||
break;
|
||||
@@ -119,7 +70,7 @@ async fn filter_task(
|
||||
while let Some(containers) = in_rx.recv().await {
|
||||
let now = Instant::now();
|
||||
|
||||
log::info!("filter -> found unhealthy: {}", containers.len());
|
||||
log::debug!("filter -> found unhealthy: {}", containers.len());
|
||||
|
||||
let prev_times = unhealthy_time.take().unwrap();
|
||||
let mut new_times: HashMap<String, Instant> = prev_times
|
||||
@@ -139,7 +90,7 @@ async fn filter_task(
|
||||
|
||||
let _ = unhealthy_time.replace(new_times);
|
||||
|
||||
log::info!("filter -> filtered unhealthy: {}", containers.len());
|
||||
log::debug!("filter -> filtered unhealthy: {}", containers.len());
|
||||
|
||||
if containers.is_empty() {
|
||||
continue;
|
||||
@@ -160,52 +111,47 @@ async fn restart_task(connection: Arc<Docker>, mut rx: mpsc::Receiver<Containers
|
||||
log::warn!("restart -> container: {}...", &container_id);
|
||||
let res = restart_container(&connection, container_id.as_str()).await;
|
||||
match res {
|
||||
Ok(_) => log::info!("ok\n"),
|
||||
Err(e) => log::error!("error: \n{e:?}\n"),
|
||||
Ok(_) => log::info!("ok"),
|
||||
Err(e) => log::error!("error: \n{e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
fn shutdown_control(shutdown: Option<SchedulerStopHandle>) {
|
||||
let mut shutdown = shutdown;
|
||||
let res = ctrlc::set_handler(move || {
|
||||
log::info!("recieved Ctrl-C");
|
||||
shutdown.take();
|
||||
});
|
||||
|
||||
if res.is_ok() {
|
||||
log::info!("Press Ctrl-C to stop");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let logger = create_logger()?;
|
||||
let cli = Cli::parse();
|
||||
let logger = create_logger(&cli.log_level)?;
|
||||
|
||||
let connection = Arc::new(Docker::connect_with_defaults()?);
|
||||
let _ = connection
|
||||
.as_ref()
|
||||
.ping()
|
||||
.await
|
||||
.context("ping on docker connection")?;
|
||||
|
||||
let connection = get_docker_connection().await?;
|
||||
let query_connection = connection.clone();
|
||||
let restart_connection = connection.clone();
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
||||
let mut scheduler = Scheduler::new(cli.interval);
|
||||
let waiter = scheduler.get_waiter().context("get_waiter")?;
|
||||
|
||||
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
|
||||
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
|
||||
let (unhealthy_timeout, label) = (cli.unhealthy_timeout, cli.label);
|
||||
|
||||
let interval = cli.interval;
|
||||
let unhealthy_timeout = cli.unhealthy_timeout;
|
||||
|
||||
shutdown_control(scheduler.get_stop_handle());
|
||||
tokio::try_join!(
|
||||
tokio::spawn(query_task(
|
||||
query_connection,
|
||||
interval,
|
||||
query_tx,
|
||||
shutdown_rx,
|
||||
cli
|
||||
)),
|
||||
tokio::spawn(scheduler.run()),
|
||||
tokio::spawn(query_task(query_connection, query_tx, waiter, label)),
|
||||
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
|
||||
tokio::spawn(restart_task(restart_connection, restart_rx)),
|
||||
tokio::spawn(async {
|
||||
log::debug!("shutdown -> sleep");
|
||||
tokio::time::sleep(Duration::from_secs(80)).await;
|
||||
log::warn!("shutdown -> drop");
|
||||
drop(shutdown_tx);
|
||||
})
|
||||
tokio::spawn(restart_task(restart_connection, restart_rx))
|
||||
)?;
|
||||
|
||||
drop(logger);
|
||||
|
||||
71
src/scheduler.rs
Normal file
71
src/scheduler.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use tokio::{
|
||||
sync::mpsc,
|
||||
time::{timeout, Duration},
|
||||
};
|
||||
|
||||
pub struct SchedulerStopHandle {
|
||||
_sender: Option<mpsc::Sender<()>>,
|
||||
}
|
||||
|
||||
impl SchedulerStopHandle {
|
||||
fn new(sender: mpsc::Sender<()>) -> Self {
|
||||
Self {
|
||||
_sender: Some(sender),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SchedulerWatcher {
|
||||
reciever: mpsc::Receiver<()>,
|
||||
}
|
||||
|
||||
impl SchedulerWatcher {
|
||||
fn new(reciever: mpsc::Receiver<()>) -> Self {
|
||||
Self { reciever }
|
||||
}
|
||||
pub async fn wait(&mut self) -> Option<()> {
|
||||
self.reciever.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Scheduler {
|
||||
interval: Duration,
|
||||
stop_handle: Option<SchedulerStopHandle>,
|
||||
stop_rx: mpsc::Receiver<()>,
|
||||
tick_tx: mpsc::Sender<()>,
|
||||
watcher: Option<SchedulerWatcher>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(interval: Duration) -> Self {
|
||||
log::trace!("Scheduler::new {interval:?}");
|
||||
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
|
||||
let (tick_tx, tick_rx) = mpsc::channel::<()>(1);
|
||||
|
||||
Self {
|
||||
interval,
|
||||
stop_handle: Some(SchedulerStopHandle::new(stop_tx)),
|
||||
stop_rx,
|
||||
tick_tx,
|
||||
watcher: Some(SchedulerWatcher::new(tick_rx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stop_handle(&mut self) -> Option<SchedulerStopHandle> {
|
||||
self.stop_handle.take()
|
||||
}
|
||||
|
||||
pub fn get_waiter(&mut self) -> Option<SchedulerWatcher> {
|
||||
self.watcher.take()
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
log::trace!("Scheduler::run");
|
||||
while (timeout(self.interval, self.stop_rx.recv()).await).is_err() {
|
||||
log::trace!("Scheduler -> tick");
|
||||
if self.tick_tx.send(()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user