From a34855063075ea41f74fc0be88a4f0272ab7bf57 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sat, 14 Sep 2024 15:47:33 +0300 Subject: [PATCH] + scheduler_task --- src/main.rs | 51 +++++++++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6073fe5..cc1eddf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,14 @@ use anyhow::Context; use bollard::container::ListContainersOptions; use bollard::Docker; - -use std::cmp::min; -use tokio::sync::mpsc; - +use clap::Parser; +use parse_duration::parse as parse_duration; use std::collections::HashMap; use std::default::Default; - -use tokio::time::{timeout, Instant}; - use std::sync::Arc; use std::time::Duration; - -use clap::Parser; - -use parse_duration::parse as parse_duration; +use tokio::sync::mpsc; +use tokio::time::{timeout, Instant}; mod logger; use logger::{create_logger, LogLevel}; @@ -76,28 +69,34 @@ async fn restart_container(connection: &Docker, container_name: &str) -> anyhow: type Containers = Vec; +async fn scheduler_task( + interval: Duration, + tx: mpsc::Sender<()>, + mut shutdown_rx: mpsc::Receiver<()>, +) { + log::debug!("scheduler_task -> start: {interval:?}"); + while (timeout(interval, shutdown_rx.recv()).await).is_err() { + log::trace!("scheduler_task -> tick"); + if tx.send(()).await.is_err() { + break; + } + } +} + async fn query_task( connection: Arc, - interval: Duration, tx: mpsc::Sender, - mut shutdown_rx: mpsc::Receiver<()>, + mut scheduler_rx: mpsc::Receiver<()>, args: Cli, ) { let query_options = get_query_options(&args); - let max_query_time = interval - Duration::from_millis(1); - let mut query_time = Duration::new(0, 0); log::debug!("query_task -> start recv"); - while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() { - let start = Instant::now(); - + while scheduler_rx.recv().await.is_some() { let containers = query_containers(&connection, query_options.clone()) .await .unwrap_or_default(); - let end = Instant::now(); - query_time = min(end - start, max_query_time); - let res = tx.send(containers).await; if res.is_err() { break; @@ -191,6 +190,7 @@ async fn main() -> anyhow::Result<()> { let restart_connection = connection.clone(); let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + let (scheduler_tx, scheduler_rx) = mpsc::channel::<()>(1); let (query_tx, filter_rx) = mpsc::channel::(1); let (filter_tx, restart_rx) = mpsc::channel::(1); @@ -201,13 +201,8 @@ async fn main() -> anyhow::Result<()> { shutdown_control(Some(shutdown_tx)); tokio::try_join!( - tokio::spawn(query_task( - query_connection, - interval, - query_tx, - shutdown_rx, - cli - )), + tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)), + tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, cli)), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(restart_task(restart_connection, restart_rx)) )?;