Compare commits

..

5 Commits

Author SHA1 Message Date
8eb54a2515 + mod scheduler
All checks were successful
Docker Image CI / test (push) Successful in 2m49s
Docker Image CI / push (push) Successful in 2m49s
2024-09-14 23:35:31 +03:00
8f7f7b88ac + mod docker
All checks were successful
Docker Image CI / test (push) Successful in 2m45s
Docker Image CI / push (push) Successful in 2m49s
2024-09-14 16:30:17 +03:00
a348550630 + scheduler_task
All checks were successful
Docker Image CI / test (push) Successful in 2m55s
Docker Image CI / push (push) Successful in 2m56s
2024-09-14 15:47:33 +03:00
28748b547e tokio features + single thread rt
All checks were successful
Docker Image CI / test (push) Successful in 3m42s
Docker Image CI / push (push) Successful in 3m23s
2024-09-13 14:51:11 +03:00
aa417f6ed1 add .dockerignore 2024-09-13 14:48:32 +03:00
6 changed files with 149 additions and 147 deletions

1
.dockerignore Normal file
View File

@@ -0,0 +1 @@
/target

60
Cargo.lock generated
View File

@@ -324,7 +324,6 @@ dependencies = [
"parse_duration",
"serde",
"tokio",
"tokio-macros",
]
[[package]]
@@ -697,16 +696,6 @@ version = "0.2.158"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.22"
@@ -873,29 +862,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
@@ -969,15 +935,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.10.6"
@@ -1097,12 +1054,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "security-framework"
version = "2.11.1"
@@ -1204,15 +1155,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
@@ -1343,9 +1285,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",

View File

@@ -18,8 +18,7 @@ log = "0.4.22"
log-panics = "2.1.0"
parse_duration = "2.1.1"
serde = "1.0.210"
tokio = {version = "1.40.0", features = ["full"]}
tokio-macros = "2.4.0"
tokio = {version = "1.40.0", features = ["rt", "macros", "time", "sync"]}
[profile.release]
opt-level = 3

49
src/docker.rs Normal file
View File

@@ -0,0 +1,49 @@
use anyhow::{Context, Result};
use bollard::container::ListContainersOptions;
use std::{collections::HashMap, sync::Arc};
pub type Docker = bollard::Docker;
pub type Containers = Vec<String>;
pub fn get_query_options(label: String) -> ListContainersOptions<String> {
let mut filters = HashMap::new();
//filters.insert("status", vec!["running"]);
filters.insert("label".into(), vec![label]);
filters.insert("health".into(), vec!["unhealthy".into()]);
ListContainersOptions {
filters,
..Default::default()
}
}
pub async fn query_containers<T>(
connection: &Docker,
query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
std::string::String: From<T>,
{
Ok(connection
.list_containers(Some(query_options))
.await?
.into_iter()
.map(|container| container.id.unwrap())
.collect())
}
pub async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
connection.restart_container(container_name, None).await?;
Ok(())
}
pub async fn get_docker_connection() -> Result<Arc<Docker>> {
let connection = Arc::new(Docker::connect_with_defaults()?);
let _ = connection
.as_ref()
.ping()
.await
.context("ping on docker connection")?;
Ok(connection)
}

View File

@@ -1,25 +1,24 @@
use anyhow::Context;
use bollard::container::ListContainersOptions;
use bollard::Docker;
use std::cmp::min;
use tokio::sync::mpsc;
use std::collections::HashMap;
use std::default::Default;
use tokio::time::{timeout, Instant};
use std::sync::Arc;
use std::time::Duration;
use clap::Parser;
use parse_duration::parse as parse_duration;
use std::{collections::HashMap, sync::Arc};
use tokio::{
sync::mpsc,
time::{Duration, Instant},
};
mod logger;
use logger::{create_logger, LogLevel};
mod docker;
use docker::{
get_docker_connection, get_query_options, query_containers, restart_container, Containers,
Docker,
};
mod scheduler;
use scheduler::{Scheduler, SchedulerStopHandle, SchedulerWatcher};
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)]
@@ -41,63 +40,20 @@ struct Cli {
pub log_level: LogLevel,
}
fn get_query_options(args: &Cli) -> ListContainersOptions<String> {
let mut filters = HashMap::new();
//filters.insert("status", vec!["running"]);
filters.insert("label".into(), vec![args.label.clone()]);
filters.insert("health".into(), vec!["unhealthy".into()]);
ListContainersOptions {
filters,
..Default::default()
}
}
async fn query_containers<T>(
connection: &Docker,
query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
std::string::String: From<T>,
{
Ok(connection
.list_containers(Some(query_options))
.await?
.into_iter()
.map(|container| container.id.unwrap())
.collect())
}
async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
connection.restart_container(container_name, None).await?;
Ok(())
}
type Containers = Vec<String>;
async fn query_task(
connection: Arc<Docker>,
interval: Duration,
tx: mpsc::Sender<Containers>,
mut shutdown_rx: mpsc::Receiver<()>,
args: Cli,
mut scheduler: SchedulerWatcher,
label: String,
) {
let query_options = get_query_options(&args);
let max_query_time = interval - Duration::from_millis(1);
let mut query_time = Duration::new(0, 0);
let query_options = get_query_options(label);
log::debug!("query_task -> start recv");
while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() {
let start = Instant::now();
while scheduler.wait().await.is_some() {
let containers = query_containers(&connection, query_options.clone())
.await
.unwrap_or_default();
let end = Instant::now();
query_time = min(end - start, max_query_time);
let res = tx.send(containers).await;
if res.is_err() {
break;
@@ -162,7 +118,7 @@ async fn restart_task(connection: Arc<Docker>, mut rx: mpsc::Receiver<Containers
}
}
fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
fn shutdown_control(shutdown: Option<SchedulerStopHandle>) {
let mut shutdown = shutdown;
let res = ctrlc::set_handler(move || {
log::info!("recieved Ctrl-C");
@@ -174,40 +130,26 @@ fn shutdown_control(shutdown: Option<mpsc::Sender<()>>) {
}
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let logger = create_logger(&cli.log_level)?;
let connection = Arc::new(Docker::connect_with_defaults()?);
let _ = connection
.as_ref()
.ping()
.await
.context("ping on docker connection")?;
let connection = get_docker_connection().await?;
let query_connection = connection.clone();
let restart_connection = connection.clone();
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let mut scheduler = Scheduler::new(cli.interval);
let waiter = scheduler.get_waiter().context("get_waiter")?;
let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
let (filter_tx, restart_rx) = mpsc::channel::<Containers>(1);
let (unhealthy_timeout, label) = (cli.unhealthy_timeout, cli.label);
let interval = cli.interval;
let unhealthy_timeout = cli.unhealthy_timeout;
shutdown_control(Some(shutdown_tx));
shutdown_control(scheduler.get_stop_handle());
tokio::try_join!(
tokio::spawn(query_task(
query_connection,
interval,
query_tx,
shutdown_rx,
cli
)),
tokio::spawn(scheduler.run()),
tokio::spawn(query_task(query_connection, query_tx, waiter, label)),
tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
tokio::spawn(restart_task(restart_connection, restart_rx))
)?;

71
src/scheduler.rs Normal file
View File

@@ -0,0 +1,71 @@
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
pub struct SchedulerStopHandle {
_sender: Option<mpsc::Sender<()>>,
}
impl SchedulerStopHandle {
fn new(sender: mpsc::Sender<()>) -> Self {
Self {
_sender: Some(sender),
}
}
}
pub struct SchedulerWatcher {
reciever: mpsc::Receiver<()>,
}
impl SchedulerWatcher {
fn new(reciever: mpsc::Receiver<()>) -> Self {
Self { reciever }
}
pub async fn wait(&mut self) -> Option<()> {
self.reciever.recv().await
}
}
pub struct Scheduler {
interval: Duration,
stop_handle: Option<SchedulerStopHandle>,
stop_rx: mpsc::Receiver<()>,
tick_tx: mpsc::Sender<()>,
watcher: Option<SchedulerWatcher>,
}
impl Scheduler {
pub fn new(interval: Duration) -> Self {
log::trace!("Scheduler::new {interval:?}");
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
let (tick_tx, tick_rx) = mpsc::channel::<()>(1);
Self {
interval,
stop_handle: Some(SchedulerStopHandle::new(stop_tx)),
stop_rx,
tick_tx,
watcher: Some(SchedulerWatcher::new(tick_rx)),
}
}
pub fn get_stop_handle(&mut self) -> Option<SchedulerStopHandle> {
self.stop_handle.take()
}
pub fn get_waiter(&mut self) -> Option<SchedulerWatcher> {
self.watcher.take()
}
pub async fn run(mut self) {
log::trace!("Scheduler::run");
while (timeout(self.interval, self.stop_rx.recv()).await).is_err() {
log::trace!("Scheduler -> tick");
if self.tick_tx.send(()).await.is_err() {
break;
}
}
}
}