+ scheduler_task
All checks were successful
Docker Image CI / test (push) Successful in 2m55s
Docker Image CI / push (push) Successful in 2m56s

This commit is contained in:
Dmitry Belyaev 2024-09-14 15:47:33 +03:00
parent 28748b547e
commit a348550630
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -1,21 +1,14 @@
use anyhow::Context; use anyhow::Context;
use bollard::container::ListContainersOptions; use bollard::container::ListContainersOptions;
use bollard::Docker; use bollard::Docker;
use clap::Parser;
use std::cmp::min; use parse_duration::parse as parse_duration;
use tokio::sync::mpsc;
use std::collections::HashMap; use std::collections::HashMap;
use std::default::Default; use std::default::Default;
use tokio::time::{timeout, Instant};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc;
use clap::Parser; use tokio::time::{timeout, Instant};
use parse_duration::parse as parse_duration;
mod logger; mod logger;
use logger::{create_logger, LogLevel}; use logger::{create_logger, LogLevel};
@ -76,28 +69,34 @@ async fn restart_container(connection: &Docker, container_name: &str) -> anyhow:
type Containers = Vec<String>; type Containers = Vec<String>;
async fn scheduler_task(
interval: Duration,
tx: mpsc::Sender<()>,
mut shutdown_rx: mpsc::Receiver<()>,
) {
log::debug!("scheduler_task -> start: {interval:?}");
while (timeout(interval, shutdown_rx.recv()).await).is_err() {
log::trace!("scheduler_task -> tick");
if tx.send(()).await.is_err() {
break;
}
}
}
async fn query_task( async fn query_task(
connection: Arc<Docker>, connection: Arc<Docker>,
interval: Duration,
tx: mpsc::Sender<Containers>, tx: mpsc::Sender<Containers>,
mut shutdown_rx: mpsc::Receiver<()>, mut scheduler_rx: mpsc::Receiver<()>,
args: Cli, args: Cli,
) { ) {
let query_options = get_query_options(&args); let query_options = get_query_options(&args);
let max_query_time = interval - Duration::from_millis(1);
let mut query_time = Duration::new(0, 0);
log::debug!("query_task -> start recv"); log::debug!("query_task -> start recv");
while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() { while scheduler_rx.recv().await.is_some() {
let start = Instant::now();
let containers = query_containers(&connection, query_options.clone()) let containers = query_containers(&connection, query_options.clone())
.await .await
.unwrap_or_default(); .unwrap_or_default();
let end = Instant::now();
query_time = min(end - start, max_query_time);
let res = tx.send(containers).await; let res = tx.send(containers).await;
if res.is_err() { if res.is_err() {
break; break;
@ -191,6 +190,7 @@ async fn main() -> anyhow::Result<()> {
let restart_connection = connection.clone(); let restart_connection = connection.clone();
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let (scheduler_tx, scheduler_rx) = mpsc::channel::<()>(1);
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1); let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1); let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
@ -201,13 +201,8 @@ async fn main() -> anyhow::Result<()> {
shutdown_control(Some(shutdown_tx)); shutdown_control(Some(shutdown_tx));
tokio::try_join!( tokio::try_join!(
tokio::spawn(query_task( tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)),
query_connection, tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, cli)),
interval,
query_tx,
shutdown_rx,
cli
)),
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
tokio::spawn(restart_task(restart_connection, restart_rx)) tokio::spawn(restart_task(restart_connection, restart_rx))
)?; )?;