+ mod scheduler
This commit is contained in:
parent
8f7f7b88ac
commit
8eb54a2515
41
src/main.rs
41
src/main.rs
@ -1,9 +1,10 @@
|
|||||||
|
use anyhow::Context;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use parse_duration::parse as parse_duration;
|
use parse_duration::parse as parse_duration;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
time::{timeout, Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
mod logger;
|
mod logger;
|
||||||
@ -15,6 +16,9 @@ use docker::{
|
|||||||
Docker,
|
Docker,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
mod scheduler;
|
||||||
|
use scheduler::{Scheduler, SchedulerStopHandle, SchedulerWatcher};
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[clap(author, version, about, long_about = None)]
|
#[clap(author, version, about, long_about = None)]
|
||||||
#[clap(propagate_version = true)]
|
#[clap(propagate_version = true)]
|
||||||
@ -36,30 +40,16 @@ struct Cli {
|
|||||||
pub log_level: LogLevel,
|
pub log_level: LogLevel,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn scheduler_task(
|
|
||||||
interval: Duration,
|
|
||||||
tx: mpsc::Sender<()>,
|
|
||||||
mut shutdown_rx: mpsc::Receiver<()>,
|
|
||||||
) {
|
|
||||||
log::debug!("scheduler_task -> start: {interval:?}");
|
|
||||||
while (timeout(interval, shutdown_rx.recv()).await).is_err() {
|
|
||||||
log::trace!("scheduler_task -> tick");
|
|
||||||
if tx.send(()).await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn query_task(
|
async fn query_task(
|
||||||
connection: Arc<Docker>,
|
connection: Arc<Docker>,
|
||||||
tx: mpsc::Sender<Containers>,
|
tx: mpsc::Sender<Containers>,
|
||||||
mut scheduler_rx: mpsc::Receiver<()>,
|
mut scheduler: SchedulerWatcher,
|
||||||
label: String,
|
label: String,
|
||||||
) {
|
) {
|
||||||
let query_options = get_query_options(label);
|
let query_options = get_query_options(label);
|
||||||
|
|
||||||
log::debug!("query_task -> start recv");
|
log::debug!("query_task -> start recv");
|
||||||
while scheduler_rx.recv().await.is_some() {
|
while scheduler.wait().await.is_some() {
|
||||||
let containers = query_containers(&connection, query_options.clone())
|
let containers = query_containers(&connection, query_options.clone())
|
||||||
.await
|
.await
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
@ -128,7 +118,7 @@ async fn restart_task(connection: Arc<Docker>, mut rx: mpsc::Receiver<Containers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
|
fn shutdown_control(shutdown: Option<SchedulerStopHandle>) {
|
||||||
let mut shutdown = shutdown;
|
let mut shutdown = shutdown;
|
||||||
let res = ctrlc::set_handler(move || {
|
let res = ctrlc::set_handler(move || {
|
||||||
log::info!("recieved Ctrl-C");
|
log::info!("recieved Ctrl-C");
|
||||||
@ -143,26 +133,23 @@ fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
|
|||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
let logger = create_logger(&cli.log_level)?;
|
let logger = create_logger(&cli.log_level)?;
|
||||||
|
|
||||||
let connection = get_docker_connection().await?;
|
let connection = get_docker_connection().await?;
|
||||||
let query_connection = connection.clone();
|
let query_connection = connection.clone();
|
||||||
let restart_connection = connection.clone();
|
let restart_connection = connection.clone();
|
||||||
|
|
||||||
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
let mut scheduler = Scheduler::new(cli.interval);
|
||||||
let (scheduler_tx, scheduler_rx) = mpsc::channel::<()>(1);
|
let waiter = scheduler.get_waiter().context("get_waiter")?;
|
||||||
|
|
||||||
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
|
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
|
||||||
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
|
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
|
||||||
|
let (unhealthy_timeout, label) = (cli.unhealthy_timeout, cli.label);
|
||||||
|
|
||||||
let (interval, unhealthy_timeout, label) = (cli.interval, cli.unhealthy_timeout, cli.label);
|
shutdown_control(scheduler.get_stop_handle());
|
||||||
|
|
||||||
shutdown_control(Some(shutdown_tx));
|
|
||||||
|
|
||||||
tokio::try_join!(
|
tokio::try_join!(
|
||||||
tokio::spawn(scheduler_task(interval, scheduler_tx, shutdown_rx)),
|
tokio::spawn(scheduler.run()),
|
||||||
tokio::spawn(query_task(query_connection, query_tx, scheduler_rx, label)),
|
tokio::spawn(query_task(query_connection, query_tx, waiter, label)),
|
||||||
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
|
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
|
||||||
tokio::spawn(restart_task(restart_connection, restart_rx))
|
tokio::spawn(restart_task(restart_connection, restart_rx))
|
||||||
)?;
|
)?;
|
||||||
|
71
src/scheduler.rs
Normal file
71
src/scheduler.rs
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
use tokio::{
|
||||||
|
sync::mpsc,
|
||||||
|
time::{timeout, Duration},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct SchedulerStopHandle {
|
||||||
|
_sender: Option<mpsc::Sender<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerStopHandle {
|
||||||
|
fn new(sender: mpsc::Sender<()>) -> Self {
|
||||||
|
Self {
|
||||||
|
_sender: Some(sender),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SchedulerWatcher {
|
||||||
|
reciever: mpsc::Receiver<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerWatcher {
|
||||||
|
fn new(reciever: mpsc::Receiver<()>) -> Self {
|
||||||
|
Self { reciever }
|
||||||
|
}
|
||||||
|
pub async fn wait(&mut self) -> Option<()> {
|
||||||
|
self.reciever.recv().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Scheduler {
|
||||||
|
interval: Duration,
|
||||||
|
stop_handle: Option<SchedulerStopHandle>,
|
||||||
|
stop_rx: mpsc::Receiver<()>,
|
||||||
|
tick_tx: mpsc::Sender<()>,
|
||||||
|
watcher: Option<SchedulerWatcher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Scheduler {
|
||||||
|
pub fn new(interval: Duration) -> Self {
|
||||||
|
log::trace!("Scheduler::new {interval:?}");
|
||||||
|
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
|
||||||
|
let (tick_tx, tick_rx) = mpsc::channel::<()>(1);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
interval,
|
||||||
|
stop_handle: Some(SchedulerStopHandle::new(stop_tx)),
|
||||||
|
stop_rx,
|
||||||
|
tick_tx,
|
||||||
|
watcher: Some(SchedulerWatcher::new(tick_rx)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_stop_handle(&mut self) -> Option<SchedulerStopHandle> {
|
||||||
|
self.stop_handle.take()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_waiter(&mut self) -> Option<SchedulerWatcher> {
|
||||||
|
self.watcher.take()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(mut self) {
|
||||||
|
log::trace!("Scheduler::run");
|
||||||
|
while (timeout(self.interval, self.stop_rx.recv()).await).is_err() {
|
||||||
|
log::trace!("Scheduler -> tick");
|
||||||
|
if self.tick_tx.send(()).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user