use bollard::container::ListContainersOptions;
use bollard::Docker;

use std::cmp::min;
use tokio::sync::mpsc;

use std::collections::HashMap;
use std::default::Default;

use tokio::time::{timeout, Instant};

use std::sync::Arc;
use std::time::Duration;

fn get_query_options() -> ListContainersOptions<&'static str> {
    let mut filters = HashMap::new();
    //filters.insert("status", vec!["running"]);
    filters.insert("label", vec!["auto-restart.unhealthy"]);
    filters.insert("health", vec!["unhealthy"]);

    ListContainersOptions {
        filters,
        ..Default::default()
    }
}

async fn query_containers<T>(
    connection: &Docker,
    query_options: ListContainersOptions<T>,
) -> anyhow::Result<Vec<String>>
where
    T: std::cmp::Eq + std::hash::Hash + serde::ser::Serialize,
    std::string::String: From<T>,
{
    Ok(connection
        .list_containers(Some(query_options))
        .await?
        .into_iter()
        .map(|container| container.id.unwrap())
        .collect())
}

async fn restart_container(connection: &Docker, container_name: &str) -> anyhow::Result<()> {
    connection.restart_container(container_name, None).await?;
    Ok(())
}

type Containers = Vec<String>;

async fn query_task(
    connection: Arc<Docker>,
    interval: Duration,
    tx: mpsc::Sender<Containers>,
    mut shutdown_rx: mpsc::Receiver<()>,
) {
    let query_options = get_query_options();
    let mut query_time = Duration::new(0, 0);
    println!("start recv");
    while (timeout(interval - query_time, shutdown_rx.recv()).await).is_err() {
        let start = Instant::now();

        let containers = query_containers(&connection, query_options.clone())
            .await
            .unwrap_or_default();

        let end = Instant::now();
        query_time = min(end - start, interval - Duration::from_millis(1));

        let res = tx.send(containers).await;
        if res.is_err() {
            break;
        }
    }
}

async fn filter_task(
    unhealthy_timeout: Duration,
    mut in_rx: mpsc::Receiver<Containers>,
    out_tx: mpsc::Sender<Containers>,
) {
    let mut unhealthy_time: Option<HashMap<String, Instant>> = Some(HashMap::new());
    while let Some(containers) = in_rx.recv().await {
        let now = Instant::now();

        println!("filter -> found unhealthy: {}", containers.len());

        let prev_times = unhealthy_time.take().unwrap();
        let mut new_times: HashMap<String, Instant> = prev_times
            .into_iter()
            .filter(|(k, _)| containers.contains(k))
            .collect();

        for container_id in containers {
            new_times.entry(container_id).or_insert_with(|| now);
        }

        let containers: Vec<String> = new_times
            .iter()
            .filter(|(_, &time)| (now - time) > unhealthy_timeout)
            .map(|(id, _)| id.clone())
            .collect();

        let _ = unhealthy_time.replace(new_times);

        println!("filter -> filtered unhealthy: {}", containers.len());

        if containers.is_empty() {
            continue;
        }

        let res = out_tx.send(containers).await;
        if res.is_err() {
            break;
        }
    }
}

async fn log_task(mut rx: mpsc::Receiver<Containers>) {
    println!("log start");
    while let Some(containers) = rx.recv().await {
        println!("log -> found: {}", containers.len());
        for container_id in containers {
            println!("log -> container: {}", container_id);
        }
        println!("")
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let connection = Arc::new(Docker::connect_with_defaults()?);

    let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);

    let (query_tx, filter_rx) = mpsc::channel::<Containers>(1);
    let (filter_tx, log_rx) = mpsc::channel::<Containers>(1);

    let interval = Duration::from_secs(10);
    let unhealthy_timeout = Duration::from_secs(35);

    tokio::try_join!(
        tokio::spawn(query_task(
            connection.clone(),
            interval,
            query_tx,
            shutdown_rx
        )),
        tokio::spawn(filter_task(unhealthy_timeout, filter_rx, filter_tx)),
        tokio::spawn(log_task(log_rx)),
        tokio::spawn(async {
            println!("shutdown -> sleep");
            tokio::time::sleep(Duration::from_secs(80)).await;
            println!("shutdown -> drop");
            drop(shutdown_tx);
        })
    )?;

    //let restart = restart_container(&connection, container_id.as_str()).await;
    //println!("restart: {}", restart.is_ok());

    Ok(())
}