diff --git a/src/main.rs b/src/main.rs index a8ed29d..69bc6b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ +use anyhow::Context; use clap::Parser; use parse_duration::parse as parse_duration; use std::{collections::HashMap, sync::Arc}; use tokio::{ sync::mpsc, - time::{timeout, Duration, Instant}, + time::{Duration, Instant}, }; mod logger; @@ -15,6 +16,9 @@ use docker::{ Docker, }; +mod scheduler; +use scheduler::{Scheduler, SchedulerStopHandle, SchedulerWatcher}; + #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] @@ -36,30 +40,16 @@ struct Cli { pub log_level: LogLevel, } -async fn scheduler_task( - interval: Duration, - tx: mpsc::Sender<()>, - mut shutdown_rx: mpsc::Receiver<()>, -) { - log::debug!("scheduler_task -> start: {interval:?}"); - while (timeout(interval, shutdown_rx.recv()).await).is_err() { - log::trace!("scheduler_task -> tick"); - if tx.send(()).await.is_err() { - break; - } - } -} - async fn query_task( connection: Arc, tx: mpsc::Sender, - mut scheduler_rx: mpsc::Receiver<()>, + mut scheduler: SchedulerWatcher, label: String, ) { let query_options = get_query_options(label); log::debug!("query_task -> start recv"); - while scheduler_rx.recv().await.is_some() { + while scheduler.wait().await.is_some() { let containers = query_containers(&connection, query_options.clone()) .await .unwrap_or_default(); @@ -128,7 +118,7 @@ async fn restart_task(connection: Arc, mut rx: mpsc::Receiver>) { +fn shutdown_control(shutdown: Option) { let mut shutdown = shutdown; let res = ctrlc::set_handler(move || { log::info!("recieved Ctrl-C"); @@ -143,26 +133,23 @@ fn shutdown_control(shutdown: Option>) { #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - let logger = create_logger(&cli.log_level)?; let connection = get_docker_connection().await?; let query_connection = connection.clone(); let restart_connection = connection.clone(); - let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); - let (scheduler_tx, scheduler_rx) = mpsc::channel::<()>(1); + let mut scheduler = Scheduler::new(cli.interval); + let waiter = scheduler.get_waiter().context("get_waiter")?; let (query_tx, filter_rx) = mpsc::channel::(1); let (filter_tx, restart_rx) = mpsc::channel::(1); + let (unhealthy_timeout, label) = (cli.unhealthy_timeout, cli.label); - let (interval, unhealthy_timeout, label) = (cli.interval, cli.unhealthy_timeout, cli.label); - - shutdown_control(Some(shutdown_tx)); - + shutdown_control(scheduler.get_stop_handle()); tokio::try_join!( - tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)), - tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, label)), + tokio::spawn(scheduler.run()), + tokio::spawn(query_task(query_connection, query_tx, waiter, label)), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(restart_task(restart_connection, restart_rx)) )?; diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..fe1760f --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,71 @@ +use tokio::{ + sync::mpsc, + time::{timeout, Duration}, +}; + +pub struct SchedulerStopHandle { + _sender: Option>, +} + +impl SchedulerStopHandle { + fn new(sender: mpsc::Sender<()>) -> Self { + Self { + _sender: Some(sender), + } + } +} + +pub struct SchedulerWatcher { + reciever: mpsc::Receiver<()>, +} + +impl SchedulerWatcher { + fn new(reciever: mpsc::Receiver<()>) -> Self { + Self { reciever } + } + pub async fn wait(&mut self) -> Option<()> { + self.reciever.recv().await + } +} + +pub struct Scheduler { + interval: Duration, + stop_handle: Option, + stop_rx: mpsc::Receiver<()>, + tick_tx: mpsc::Sender<()>, + watcher: Option, +} + +impl Scheduler { + pub fn new(interval: Duration) -> Self { + log::trace!("Scheduler::new {interval:?}"); + let (stop_tx, stop_rx) = mpsc::channel::<()>(1); + let (tick_tx, tick_rx) = mpsc::channel::<()>(1); + + Self { + interval, + stop_handle: Some(SchedulerStopHandle::new(stop_tx)), + stop_rx, + tick_tx, + watcher: Some(SchedulerWatcher::new(tick_rx)), + } + } + + pub fn get_stop_handle(&mut self) -> Option { + self.stop_handle.take() + } + + pub fn get_waiter(&mut self) -> Option { + self.watcher.take() + } + + pub async fn run(mut self) { + log::trace!("Scheduler::run"); + while (timeout(self.interval, self.stop_rx.recv()).await).is_err() { + log::trace!("Scheduler -> tick"); + if self.tick_tx.send(()).await.is_err() { + break; + } + } + } +}