+ mod docker
All checks were successful
Docker Image CI / test (push) Successful in 2m45s
Docker Image CI / push (push) Successful in 2m49s

This commit is contained in:
Dmitry Belyaev 2024-09-14 16:30:17 +03:00
parent a348550630
commit 8f7f7b88ac
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3
2 changed files with 65 additions and 56 deletions

49
src/docker.rs Normal file
View File

@ -0,0 +1,49 @@
use anyhow::{Context, Result};
use bollard::container::ListContainersOptions;
use std::{collections::HashMap, sync::Arc};
pub type Docker = bollard::Docker;
pub type Containers = Vec<String>;
pub fn get_query_options(label: String) -> ListContainersOptions<String> {
let mut filters = HashMap::new();
//filters.insert("status", vec!["running"]);
filters.insert("label".into(), vec![label]);
filters.insert("health".into(), vec!["unhealthy".into()]);
ListContainersOptions {
filters,
..Default::default()
}
}
pub async fn query_containers<T>(
connection: &Docker,
query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
std::string::String: From<T>,
{
Ok(connection
.list_containers(Some(query_options))
.await?
.into_iter()
.map(|container| container.id.unwrap())
.collect())
}
pub async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
connection.restart_container(container_name, None).await?;
Ok(())
}
pub async fn get_docker_connection() -> Result<Arc<Docker>> {
let connection = Arc::new(Docker::connect_with_defaults()?);
let _ = connection
.as_ref()
.ping()
.await
.context("ping on docker connection")?;
Ok(connection)
}

View File

@ -1,18 +1,20 @@
use anyhow::Context;
use bollard::container::ListContainersOptions;
use bollard::Docker;
use clap::Parser; use clap::Parser;
use parse_duration::parse as parse_duration; use parse_duration::parse as parse_duration;
use std::collections::HashMap; use std::{collections::HashMap, sync::Arc};
use std::default::Default; use tokio::{
use std::sync::Arc; sync::mpsc,
use std::time::Duration; time::{timeout, Duration, Instant},
use tokio::sync::mpsc; };
use tokio::time::{timeout, Instant};
mod logger; mod logger;
use logger::{create_logger, LogLevel}; use logger::{create_logger, LogLevel};
mod docker;
use docker::{
get_docker_connection, get_query_options, query_containers, restart_container, Containers,
Docker,
};
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)] #[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)] #[clap(propagate_version = true)]
@ -34,41 +36,6 @@ struct Cli {
pub log_level: LogLevel, pub log_level: LogLevel,
} }
fn get_query_options(args: &Cli) -> ListContainersOptions<String> {
let mut filters = HashMap::new();
//filters.insert("status", vec!["running"]);
filters.insert("label".into(), vec![args.label.clone()]);
filters.insert("health".into(), vec!["unhealthy".into()]);
ListContainersOptions {
filters,
..Default::default()
}
}
async fn query_containers<T>(
connection: &Docker,
query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
std::string::String: From<T>,
{
Ok(connection
.list_containers(Some(query_options))
.await?
.into_iter()
.map(|container| container.id.unwrap())
.collect())
}
async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
connection.restart_container(container_name, None).await?;
Ok(())
}
type Containers = Vec<String>;
async fn scheduler_task( async fn scheduler_task(
interval: Duration, interval: Duration,
tx: mpsc::Sender<()>, tx: mpsc::Sender<()>,
@ -87,9 +54,9 @@ async fn query_task(
connection: Arc<Docker>, connection: Arc<Docker>,
tx: mpsc::Sender<Containers>, tx: mpsc::Sender<Containers>,
mut scheduler_rx: mpsc::Receiver<()>, mut scheduler_rx: mpsc::Receiver<()>,
args: Cli, label: String,
) { ) {
let query_options = get_query_options(&args); let query_options = get_query_options(label);
log::debug!("query_task -> start recv"); log::debug!("query_task -> start recv");
while scheduler_rx.recv().await.is_some() { while scheduler_rx.recv().await.is_some() {
@ -179,13 +146,7 @@ async fn main() -> anyhow::Result<()> {
let logger = create_logger(&cli.log_level)?; let logger = create_logger(&cli.log_level)?;
let connection = Arc::new(Docker::connect_with_defaults()?); let connection = get_docker_connection().await?;
let _ = connection
.as_ref()
.ping()
.await
.context("ping on docker connection")?;
let query_connection = connection.clone(); let query_connection = connection.clone();
let restart_connection = connection.clone(); let restart_connection = connection.clone();
@ -195,14 +156,13 @@ async fn main() -> anyhow::Result<()> {
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1); let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1); let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
let interval = cli.interval; let (interval, unhealthy_timeout, label) = (cli.interval, cli.unhealthy_timeout, cli.label);
let unhealthy_timeout = cli.unhealthy_timeout;
shutdown_control(Some(shutdown_tx)); shutdown_control(Some(shutdown_tx));
tokio::try_join!( tokio::try_join!(
tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)), tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)),
tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, cli)), tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, label)),
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)), tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
tokio::spawn(restart_task(restart_connection, restart_rx)) tokio::spawn(restart_task(restart_connection, restart_rx))
)?; )?;