use bollard::container::ListContainersOptions; use bollard::Docker; use std::cmp::min; use tokio::sync::mpsc; use std::collections::HashMap; use std::default::Default; use tokio::time::{timeout, Instant}; use std::sync::Arc; use std::time::Duration; fn get_query_options() -> ListContainersOptions<&'static str> { let mut filters = HashMap::new(); //filters.insert("status", vec!["running"]); filters.insert("label", vec!["auto-restart.unhealthy"]); filters.insert("health", vec!["unhealthy"]); ListContainersOptions { filters, ..Default::default() } } async fn query_containers( connection: &Docker, query_options: ListContainersOptions, ) -> anyhow::Result> where T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize, std::string::String: From, { Ok(connection .list_containers(Some(query_options)) .await? .into_iter() .map(|container| container.id.unwrap()) .collect()) } async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> { connection.restart_container(container_name, None).await?; Ok(()) } type Containers = Vec; async fn query_task( connection: Arc, interval: Duration, tx: mpsc::Sender, mut shutdown_rx: mpsc::Receiver<()>, ) { let query_options = get_query_options(); let mut query_time = Duration::new(0, 0); println!("start recv"); while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() { let start = Instant::now(); let containers = query_containers(&connection, query_options.clone()) .await .unwrap_or_default(); let end = Instant::now(); query_time = min(end - start, interval - Duration::from_millis(1)); let res = tx.send(containers).await; if res.is_err() { break; } } } async fn filter_task( unhealthy_timeout: Duration, mut in_rx: mpsc::Receiver, out_tx: mpsc::Sender, ) { let mut unhealthy_time: Option> = Some(HashMap::new()); while let Some(containers) = in_rx.recv().await { let now = Instant::now(); println!("filter -> found unhealthy: {}", containers.len()); let prev_times = unhealthy_time.take().unwrap(); let mut new_times: HashMap = prev_times .into_iter() .filter(|(k, _)| containers.contains(k)) .collect(); for container_id in containers { new_times.entry(container_id).or_insert_with(|| now); } let containers: Vec = new_times .iter() .filter(|(_, &time)| (now - time) > unhealthy_timeout) .map(|(id, _)| id.clone()) .collect(); let _ = unhealthy_time.replace(new_times); println!("filter -> filtered unhealthy: {}", containers.len()); if containers.is_empty() { continue; } let res = out_tx.send(containers).await; if res.is_err() { break; } } } async fn restart_task(connection: Arc, mut rx: mpsc::Receiver) { println!("restart task start"); while let Some(containers) = rx.recv().await { println!("restart -> found: {}", containers.len()); for container_id in containers { print!("restart -> container: {}...", &container_id); let res = restart_container(&connection, container_id.as_str()).await; match res { Ok(_) => println!("ok\n"), Err(e) => println!("error: \n{e:?}\n") } } } } #[tokio::main] async fn main() -> anyhow::Result<()> { let connection = Arc::new(Docker::connect_with_defaults()?); let query_connection = connection.clone(); let restart_connection = connection.clone(); let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); let (query_tx, filter_rx) = mpsc::channel::(1); let (filter_tx, restart_rx) = mpsc::channel::(1); let interval = Duration::from_secs(10); let unhealthy_timeout = Duration::from_secs(35); tokio::try_join!( tokio::spawn(query_task( query_connection, interval, query_tx, shutdown_rx )), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(restart_task(restart_connection, restart_rx)), tokio::spawn(async { println!("shutdown -> sleep"); tokio::time::sleep(Duration::from_secs(80)).await; println!("shutdown -> drop"); drop(shutdown_tx); }) )?; //let restart = restart_container(&connection, container_id.as_str()).await; //println!("restart: {}", restart.is_ok()); Ok(()) }