From f6d99b224e081fffad5a8ba049e3278941051b23 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 12 Sep 2024 16:24:55 +0300 Subject: [PATCH] checkpoint --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 165 +++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 145 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e17800..0922e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,6 +195,7 @@ dependencies = [ "anyhow", "bollard", "futures-util", + "serde", "tokio", "tokio-macros", ] diff --git a/Cargo.toml b/Cargo.toml index 08821e0..25e53ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,6 @@ edition = "2021" anyhow = "1.0.88" bollard = { version = "0.17.1", features = ["ssl", "json_data_content", "tokio-stream"] } futures-util = "0.3.30" +serde = "1.0.210" tokio = {version = "1.40.0", features = ["full"]} tokio-macros = "2.4.0" diff --git a/src/main.rs b/src/main.rs index 4ca525c..222cb29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,41 +1,162 @@ -use bollard::{API_DEFAULT_VERSION, Docker}; -use futures_util::future::TryFutureExt; use bollard::container::ListContainersOptions; +use bollard::Docker; + +use std::cmp::min; +use tokio::sync::mpsc; use std::collections::HashMap; use std::default::Default; -#[tokio::main] -async fn main() -> Result<(), Box> { - let connection = Docker::connect_with_defaults().expect("connection"); - +use tokio::time::{timeout, Instant}; + +use std::sync::Arc; +use std::time::Duration; + +fn get_query_options() -> ListContainersOptions<&'static str> { let mut filters = HashMap::new(); - filters.insert("status", vec!["running"]); + //filters.insert("status", vec!["running"]); filters.insert("label", vec!["auto-restart.unhealthy"]); filters.insert("health", vec!["unhealthy"]); - let options = Some(ListContainersOptions{ - all: true, + ListContainersOptions { filters, ..Default::default() - }); + } +} - let result = connection.list_containers(options).await; - - //let result = connection.ping().map_ok(|_| Ok::<_, ()>(println!("Connected!"))).await; +async fn query_containers( + connection: &Docker, + query_options: ListContainersOptions, +) -> anyhow::Result> +where + T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize, + std::string::String: From, +{ + Ok(connection + .list_containers(Some(query_options)) + .await? + .into_iter() + .map(|container| container.id.unwrap()) + .collect()) +} - if let Err(x) = result { - eprintln!("error: {:#?}", x); - } else if let Ok(found) = result { - println!("found: {}", found.len()); - for container in found { - let container_id = container.id.unwrap(); - println!("{:?}", container.names.unwrap_or(vec![container_id.clone()])); +async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> { + connection.restart_container(container_name, None).await?; + Ok(()) +} - //let r = connection.restart_container(container_id.as_str(), None).await; - //println!("{:?}", r); +type Containers = Vec; + +async fn query_task( + connection: Arc, + interval: Duration, + tx: mpsc::Sender, + mut shutdown_rx: mpsc::Receiver<()>, +) { + let query_options = get_query_options(); + let mut query_time = Duration::new(0, 0); + println!("start recv"); + while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() { + let start = Instant::now(); + + let containers = query_containers(&connection, query_options.clone()) + .await + .unwrap_or_default(); + + let end = Instant::now(); + query_time = min(end - start, interval - Duration::from_millis(1)); + + let res = tx.send(containers).await; + if res.is_err() { + break; } } +} + +async fn filter_task( + unhealthy_timeout: Duration, + mut in_rx: mpsc::Receiver, + out_tx: mpsc::Sender, +) { + let mut unhealthy_time: Option> = Some(HashMap::new()); + while let Some(containers) = in_rx.recv().await { + let now = Instant::now(); + + println!("filter -> found unhealthy: {}", containers.len()); + + let prev_times = unhealthy_time.take().unwrap(); + let mut new_times: HashMap = prev_times + .into_iter() + .filter(|(k, _)| containers.contains(k)) + .collect(); + + for container_id in containers { + new_times.entry(container_id).or_insert_with(|| now); + } + + let containers: Vec = new_times + .iter() + .filter(|(_, &time)| (now - time) > unhealthy_timeout) + .map(|(id, _)| id.clone()) + .collect(); + + let _ = unhealthy_time.replace(new_times); + + println!("filter -> filtered unhealthy: {}", containers.len()); + + if containers.is_empty() { + continue; + } + + let res = out_tx.send(containers).await; + if res.is_err() { + break; + } + } +} + +async fn log_task(mut rx: mpsc::Receiver) { + println!("log start"); + while let Some(containers) = rx.recv().await { + println!("log -> found: {}", containers.len()); + for container_id in containers { + println!("log -> container: {}", container_id); + } + println!("") + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let connection = Arc::new(Docker::connect_with_defaults()?); + + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + let (query_tx, filter_rx) = mpsc::channel::(1); + let (filter_tx, log_rx) = mpsc::channel::(1); + + let interval = Duration::from_secs(10); + let unhealthy_timeout = Duration::from_secs(35); + + tokio::try_join!( + tokio::spawn(query_task( + connection.clone(), + interval, + query_tx, + shutdown_rx + )), + tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), + tokio::spawn(log_task(log_rx)), + tokio::spawn(async { + println!("shutdown -> sleep"); + tokio::time::sleep(Duration::from_secs(80)).await; + println!("shutdown -> drop"); + drop(shutdown_tx); + }) + )?; + + //let restart = restart_container(&connection, container_id.as_str()).await; + //println!("restart: {}", restart.is_ok()); Ok(()) }