doctor-restart/src/main.rs

191 lines
5.4 KiB
Rust
Raw Normal View History

2024-09-12 16:24:20 +00:00
use anyhow::Context;
2024-09-12 10:35:01 +00:00
use bollard::container::ListContainersOptions;
2024-09-12 13:24:55 +00:00
use bollard::Docker;
use std::cmp::min;
use tokio::sync::mpsc;
2024-09-12 10:35:01 +00:00
use std::collections::HashMap;
use std::default::Default;
2024-09-12 13:24:55 +00:00
use tokio::time::{timeout, Instant};
2024-09-12 16:24:20 +00:00
use flexi_logger::{AdaptiveFormat, Logger, LoggerHandle};
2024-09-12 13:24:55 +00:00
use std::sync::Arc;
use std::time::Duration;
fn get_query_options() -> ListContainersOptions<&'static str> {
2024-09-12 10:35:01 +00:00
let mut filters = HashMap::new();
2024-09-12 13:24:55 +00:00
//filters.insert("status", vec!["running"]);
2024-09-12 10:35:01 +00:00
filters.insert("label", vec!["auto-restart.unhealthy"]);
filters.insert("health", vec!["unhealthy"]);
2024-09-12 13:24:55 +00:00
ListContainersOptions {
2024-09-12 10:35:01 +00:00
filters,
..Default::default()
2024-09-12 13:24:55 +00:00
}
}
2024-09-12 16:24:20 +00:00
fn create_logger() -> anyhow::Result<LoggerHandle> {
let logger = Logger::try_with_str("info")
.context("default logging level invalid")?
.format(flexi_logger::detailed_format)
.adaptive_format_for_stdout(AdaptiveFormat::Detailed)
.log_to_stdout()
.start()
.context("can't start logger");
log_panics::init();
logger
}
2024-09-12 13:24:55 +00:00
async fn query_containers<T>(
connection: &Docker,
query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
std::string::String: From<T>,
{
Ok(connection
.list_containers(Some(query_options))
.await?
.into_iter()
.map(|container| container.id.unwrap())
.collect())
}
async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
connection.restart_container(container_name, None).await?;
Ok(())
}
type Containers = Vec<String>;
async fn query_task(
connection: Arc<Docker>,
interval: Duration,
tx: mpsc::Sender<Containers>,
mut shutdown_rx: mpsc::Receiver<()>,
) {
let query_options = get_query_options();
let mut query_time = Duration::new(0, 0);
2024-09-12 16:24:20 +00:00
log::debug!("query_task -> start recv");
2024-09-12 13:24:55 +00:00
while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() {
let start = Instant::now();
let containers = query_containers(&connection, query_options.clone())
.await
.unwrap_or_default();
let end = Instant::now();
query_time = min(end - start, interval - Duration::from_millis(1));
let res = tx.send(containers).await;
if res.is_err() {
break;
}
}
}
async fn filter_task(
unhealthy_timeout: Duration,
mut in_rx: mpsc::Receiver<Containers>,
out_tx: mpsc::Sender<Containers>,
) {
let mut unhealthy_time: Option<HashMap<String, Instant>> = Some(HashMap::new());
while let Some(containers) = in_rx.recv().await {
let now = Instant::now();
2024-09-12 16:24:20 +00:00
log::info!("filter -> found unhealthy: {}", containers.len());
2024-09-12 13:24:55 +00:00
let prev_times = unhealthy_time.take().unwrap();
let mut new_times: HashMap<String, Instant> = prev_times
.into_iter()
.filter(|(k, _)| containers.contains(k))
.collect();
for container_id in containers {
new_times.entry(container_id).or_insert_with(|| now);
}
let containers: Vec<String> = new_times
.iter()
.filter(|(_, &time)| (now - time) > unhealthy_timeout)
.map(|(id, _)| id.clone())
.collect();
let _ = unhealthy_time.replace(new_times);
2024-09-12 16:24:20 +00:00
log::info!("filter -> filtered unhealthy: {}", containers.len());
2024-09-12 13:24:55 +00:00
if containers.is_empty() {
continue;
}
let res = out_tx.send(containers).await;
if res.is_err() {
break;
2024-09-12 10:35:01 +00:00
}
}
2024-09-12 13:24:55 +00:00
}
2024-09-12 13:55:40 +00:00
async fn restart_task(connection: Arc<Docker>, mut rx: mpsc::Receiver<Containers>) {
2024-09-12 16:24:20 +00:00
log::debug!("restart task start");
2024-09-12 13:24:55 +00:00
while let Some(containers) = rx.recv().await {
2024-09-12 16:24:20 +00:00
log::info!("restart -> found: {}", containers.len());
2024-09-12 13:24:55 +00:00
for container_id in containers {
2024-09-12 16:24:20 +00:00
log::warn!("restart -> container: {}...", &container_id);
2024-09-12 13:55:40 +00:00
let res = restart_container(&connection, container_id.as_str()).await;
match res {
2024-09-12 16:24:20 +00:00
Ok(_) => log::info!("ok\n"),
Err(e) => log::error!("error: \n{e:?}\n"),
2024-09-12 13:55:40 +00:00
}
2024-09-12 13:24:55 +00:00
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
2024-09-12 16:24:20 +00:00
let logger = create_logger()?;
2024-09-12 13:24:55 +00:00
let connection = Arc::new(Docker::connect_with_defaults()?);
2024-09-12 16:24:20 +00:00
let _ = connection
.as_ref()
.ping()
.await
.context("ping on docker connection")?;
2024-09-12 13:55:40 +00:00
let query_connection = connection.clone();
let restart_connection = connection.clone();
2024-09-12 13:24:55 +00:00
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
2024-09-12 13:55:40 +00:00
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
2024-09-12 13:24:55 +00:00
let interval = Duration::from_secs(10);
let unhealthy_timeout = Duration::from_secs(35);
tokio::try_join!(
tokio::spawn(query_task(
2024-09-12 13:55:40 +00:00
query_connection,
2024-09-12 13:24:55 +00:00
interval,
query_tx,
shutdown_rx
)),
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
2024-09-12 13:55:40 +00:00
tokio::spawn(restart_task(restart_connection, restart_rx)),
2024-09-12 13:24:55 +00:00
tokio::spawn(async {
2024-09-12 16:24:20 +00:00
log::debug!("shutdown -> sleep");
2024-09-12 13:24:55 +00:00
tokio::time::sleep(Duration::from_secs(80)).await;
2024-09-12 16:24:20 +00:00
log::warn!("shutdown -> drop");
2024-09-12 13:24:55 +00:00
drop(shutdown_tx);
})
)?;
2024-09-12 16:24:20 +00:00
drop(logger);
2024-09-12 10:35:01 +00:00
Ok(())
}