Compare commits
3 Commits
28748b547e
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
8eb54a2515
|
|||
|
8f7f7b88ac
|
|||
|
a348550630
|
49
src/docker.rs
Normal file
49
src/docker.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use anyhow::{Context, Result};
|
||||
use bollard::container::ListContainersOptions;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub type Docker = bollard::Docker;
|
||||
pub type Containers = Vec<String>;
|
||||
|
||||
pub fn get_query_options(label: String) -> ListContainersOptions<String> {
|
||||
let mut filters = HashMap::new();
|
||||
//filters.insert("status", vec!["running"]);
|
||||
filters.insert("label".into(), vec![label]);
|
||||
filters.insert("health".into(), vec!["unhealthy".into()]);
|
||||
|
||||
ListContainersOptions {
|
||||
filters,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn query_containers<T>(
|
||||
connection: &Docker,
|
||||
query_options: ListContainersOptions<T>,
|
||||
) -> anyhow::Result<Vec<String>>
|
||||
where
|
||||
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
|
||||
std::string::String: From<T>,
|
||||
{
|
||||
Ok(connection
|
||||
.list_containers(Some(query_options))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|container| container.id.unwrap())
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
|
||||
connection.restart_container(container_name, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_docker_connection() -> Result<Arc<Docker>> {
|
||||
let connection = Arc::new(Docker::connect_with_defaults()?);
|
||||
let _ = connection
|
||||
.as_ref()
|
||||
.ping()
|
||||
.await
|
||||
.context("ping on docker connection")?;
|
||||
Ok(connection)
|
||||
}
|
||||
110
src/main.rs
110
src/main.rs
@@ -1,25 +1,24 @@
|
||||
use anyhow::Context;
|
||||
use bollard::container::ListContainersOptions;
|
||||
use bollard::Docker;
|
||||
|
||||
use std::cmp::min;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::default::Default;
|
||||
|
||||
use tokio::time::{timeout, Instant};
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use parse_duration::parse as parse_duration;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::{
|
||||
sync::mpsc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
mod logger;
|
||||
use logger::{create_logger, LogLevel};
|
||||
|
||||
mod docker;
|
||||
use docker::{
|
||||
get_docker_connection, get_query_options, query_containers, restart_container, Containers,
|
||||
Docker,
|
||||
};
|
||||
|
||||
mod scheduler;
|
||||
use scheduler::{Scheduler, SchedulerStopHandle, SchedulerWatcher};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
#[clap(propagate_version = true)]
|
||||
@@ -41,63 +40,20 @@ struct Cli {
|
||||
pub log_level: LogLevel,
|
||||
}
|
||||
|
||||
fn get_query_options(args: &Cli) -> ListContainersOptions<String> {
|
||||
let mut filters = HashMap::new();
|
||||
//filters.insert("status", vec!["running"]);
|
||||
filters.insert("label".into(), vec![args.label.clone()]);
|
||||
filters.insert("health".into(), vec!["unhealthy".into()]);
|
||||
|
||||
ListContainersOptions {
|
||||
filters,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn query_containers<T>(
|
||||
connection: &Docker,
|
||||
query_options: ListContainersOptions<T>,
|
||||
) -> anyhow::Result<Vec<String>>
|
||||
where
|
||||
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
|
||||
std::string::String: From<T>,
|
||||
{
|
||||
Ok(connection
|
||||
.list_containers(Some(query_options))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|container| container.id.unwrap())
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
|
||||
connection.restart_container(container_name, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
type Containers = Vec<String>;
|
||||
|
||||
async fn query_task(
|
||||
connection: Arc<Docker>,
|
||||
interval: Duration,
|
||||
tx: mpsc::Sender<Containers>,
|
||||
mut shutdown_rx: mpsc::Receiver<()>,
|
||||
args: Cli,
|
||||
mut scheduler: SchedulerWatcher,
|
||||
label: String,
|
||||
) {
|
||||
let query_options = get_query_options(&args);
|
||||
let max_query_time = interval - Duration::from_millis(1);
|
||||
let mut query_time = Duration::new(0, 0);
|
||||
let query_options = get_query_options(label);
|
||||
|
||||
log::debug!("query_task -> start recv");
|
||||
while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() {
|
||||
let start = Instant::now();
|
||||
|
||||
while scheduler.wait().await.is_some() {
|
||||
let containers = query_containers(&connection, query_options.clone())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let end = Instant::now();
|
||||
query_time = min(end - start, max_query_time);
|
||||
|
||||
let res = tx.send(containers).await;
|
||||
if res.is_err() {
|
||||
break;
|
||||
@@ -162,7 +118,7 @@ async fn restart_task(connection: Arc<Docker>, mut rx: mpsc::Receiver<Containers
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
|
||||
fn shutdown_control(shutdown: Option<SchedulerStopHandle>) {
|
||||
let mut shutdown = shutdown;
|
||||
let res = ctrlc::set_handler(move || {
|
||||
log::info!("recieved Ctrl-C");
|
||||
@@ -177,37 +133,23 @@ fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let logger = create_logger(&cli.log_level)?;
|
||||
|
||||
let connection = Arc::new(Docker::connect_with_defaults()?);
|
||||
let _ = connection
|
||||
.as_ref()
|
||||
.ping()
|
||||
.await
|
||||
.context("ping on docker connection")?;
|
||||
|
||||
let connection = get_docker_connection().await?;
|
||||
let query_connection = connection.clone();
|
||||
let restart_connection = connection.clone();
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
||||
let mut scheduler = Scheduler::new(cli.interval);
|
||||
let waiter = scheduler.get_waiter().context("get_waiter")?;
|
||||
|
||||
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
|
||||
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
|
||||
let (unhealthy_timeout, label) = (cli.unhealthy_timeout, cli.label);
|
||||
|
||||
let interval = cli.interval;
|
||||
let unhealthy_timeout = cli.unhealthy_timeout;
|
||||
|
||||
shutdown_control(Some(shutdown_tx));
|
||||
|
||||
shutdown_control(scheduler.get_stop_handle());
|
||||
tokio::try_join!(
|
||||
tokio::spawn(query_task(
|
||||
query_connection,
|
||||
interval,
|
||||
query_tx,
|
||||
shutdown_rx,
|
||||
cli
|
||||
)),
|
||||
tokio::spawn(scheduler.run()),
|
||||
tokio::spawn(query_task(query_connection, query_tx, waiter, label)),
|
||||
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
|
||||
tokio::spawn(restart_task(restart_connection, restart_rx))
|
||||
)?;
|
||||
|
||||
71
src/scheduler.rs
Normal file
71
src/scheduler.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use tokio::{
|
||||
sync::mpsc,
|
||||
time::{timeout, Duration},
|
||||
};
|
||||
|
||||
pub struct SchedulerStopHandle {
|
||||
_sender: Option<mpsc::Sender<()>>,
|
||||
}
|
||||
|
||||
impl SchedulerStopHandle {
|
||||
fn new(sender: mpsc::Sender<()>) -> Self {
|
||||
Self {
|
||||
_sender: Some(sender),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SchedulerWatcher {
|
||||
reciever: mpsc::Receiver<()>,
|
||||
}
|
||||
|
||||
impl SchedulerWatcher {
|
||||
fn new(reciever: mpsc::Receiver<()>) -> Self {
|
||||
Self { reciever }
|
||||
}
|
||||
pub async fn wait(&mut self) -> Option<()> {
|
||||
self.reciever.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Scheduler {
|
||||
interval: Duration,
|
||||
stop_handle: Option<SchedulerStopHandle>,
|
||||
stop_rx: mpsc::Receiver<()>,
|
||||
tick_tx: mpsc::Sender<()>,
|
||||
watcher: Option<SchedulerWatcher>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(interval: Duration) -> Self {
|
||||
log::trace!("Scheduler::new {interval:?}");
|
||||
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
|
||||
let (tick_tx, tick_rx) = mpsc::channel::<()>(1);
|
||||
|
||||
Self {
|
||||
interval,
|
||||
stop_handle: Some(SchedulerStopHandle::new(stop_tx)),
|
||||
stop_rx,
|
||||
tick_tx,
|
||||
watcher: Some(SchedulerWatcher::new(tick_rx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stop_handle(&mut self) -> Option<SchedulerStopHandle> {
|
||||
self.stop_handle.take()
|
||||
}
|
||||
|
||||
pub fn get_waiter(&mut self) -> Option<SchedulerWatcher> {
|
||||
self.watcher.take()
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
log::trace!("Scheduler::run");
|
||||
while (timeout(self.interval, self.stop_rx.recv()).await).is_err() {
|
||||
log::trace!("Scheduler -> tick");
|
||||
if self.tick_tx.send(()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user