From 8f7f7b88ac75fc61bd7d67f4c695b952360509a0 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sat, 14 Sep 2024 16:30:17 +0300 Subject: [PATCH] + mod docker --- src/docker.rs | 49 +++++++++++++++++++++++++++++++++++ src/main.rs | 72 ++++++++++++--------------------------------------- 2 files changed, 65 insertions(+), 56 deletions(-) create mode 100644 src/docker.rs diff --git a/src/docker.rs b/src/docker.rs new file mode 100644 index 0000000..b7568d6 --- /dev/null +++ b/src/docker.rs @@ -0,0 +1,49 @@ +use anyhow::{Context, Result}; +use bollard::container::ListContainersOptions; +use std::{collections::HashMap, sync::Arc}; + +pub type Docker = bollard::Docker; +pub type Containers = Vec; + +pub fn get_query_options(label: String) -> ListContainersOptions { + let mut filters = HashMap::new(); + //filters.insert("status", vec!["running"]); + filters.insert("label".into(), vec![label]); + filters.insert("health".into(), vec!["unhealthy".into()]); + + ListContainersOptions { + filters, + ..Default::default() + } +} + +pub async fn query_containers( + connection: &Docker, + query_options: ListContainersOptions, +) -> anyhow::Result> +where + T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize, + std::string::String: From, +{ + Ok(connection + .list_containers(Some(query_options)) + .await? + .into_iter() + .map(|container| container.id.unwrap()) + .collect()) +} + +pub async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> { + connection.restart_container(container_name, None).await?; + Ok(()) +} + +pub async fn get_docker_connection() -> Result> { + let connection = Arc::new(Docker::connect_with_defaults()?); + let _ = connection + .as_ref() + .ping() + .await + .context("ping on docker connection")?; + Ok(connection) +} diff --git a/src/main.rs b/src/main.rs index cc1eddf..a8ed29d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,20 @@ -use anyhow::Context; -use bollard::container::ListContainersOptions; -use bollard::Docker; use clap::Parser; use parse_duration::parse as parse_duration; -use std::collections::HashMap; -use std::default::Default; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::mpsc; -use tokio::time::{timeout, Instant}; +use std::{collections::HashMap, sync::Arc}; +use tokio::{ + sync::mpsc, + time::{timeout, Duration, Instant}, +}; mod logger; use logger::{create_logger, LogLevel}; +mod docker; +use docker::{ + get_docker_connection, get_query_options, query_containers, restart_container, Containers, + Docker, +}; + #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] @@ -34,41 +36,6 @@ struct Cli { pub log_level: LogLevel, } -fn get_query_options(args: &Cli) -> ListContainersOptions { - let mut filters = HashMap::new(); - //filters.insert("status", vec!["running"]); - filters.insert("label".into(), vec![args.label.clone()]); - filters.insert("health".into(), vec!["unhealthy".into()]); - - ListContainersOptions { - filters, - ..Default::default() - } -} - -async fn query_containers( - connection: &Docker, - query_options: ListContainersOptions, -) -> anyhow::Result> -where - T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize, - std::string::String: From, -{ - Ok(connection - .list_containers(Some(query_options)) - .await? - .into_iter() - .map(|container| container.id.unwrap()) - .collect()) -} - -async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> { - connection.restart_container(container_name, None).await?; - Ok(()) -} - -type Containers = Vec; - async fn scheduler_task( interval: Duration, tx: mpsc::Sender<()>, @@ -87,9 +54,9 @@ async fn query_task( connection: Arc, tx: mpsc::Sender, mut scheduler_rx: mpsc::Receiver<()>, - args: Cli, + label: String, ) { - let query_options = get_query_options(&args); + let query_options = get_query_options(label); log::debug!("query_task -> start recv"); while scheduler_rx.recv().await.is_some() { @@ -179,13 +146,7 @@ async fn main() -> anyhow::Result<()> { let logger = create_logger(&cli.log_level)?; - let connection = Arc::new(Docker::connect_with_defaults()?); - let _ = connection - .as_ref() - .ping() - .await - .context("ping on docker connection")?; - + let connection = get_docker_connection().await?; let query_connection = connection.clone(); let restart_connection = connection.clone(); @@ -195,14 +156,13 @@ async fn main() -> anyhow::Result<()> { let (query_tx, filter_rx) = mpsc::channel::(1); let (filter_tx, restart_rx) = mpsc::channel::(1); - let interval = cli.interval; - let unhealthy_timeout = cli.unhealthy_timeout; + let (interval, unhealthy_timeout, label) = (cli.interval, cli.unhealthy_timeout, cli.label); shutdown_control(Some(shutdown_tx)); tokio::try_join!( tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)), - tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, cli)), + tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, label)), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(restart_task(restart_connection, restart_rx)) )?;