#[macro_use]
extern crate criterion;
extern crate bincode;
extern crate serde;
extern crate serde_derive;
extern crate tempfile;

use async_compression::Level;
use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts};
use chgk_ledb_lib::questions::{Question, QuestionsConverterAsyncForStream};
use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync;
use futures::pin_mut;
use futures::StreamExt;
use std::time::Duration;
use std::{ops::Deref, path::Path};

use async_zip::tokio::read::seek::ZipFileReader;

use criterion::{BatchSize, Criterion};
use lazy_static::lazy_static;
use tempfile::{tempdir, NamedTempFile};
use tokio::{fs, runtime};

const ZIP_FILENAME: &str = "../json.zip";
const NEW_DB_FILENAME: &str = "../db.dat";

lazy_static! {
    static ref RUNTIME: tokio::runtime::Runtime =
        runtime::Builder::new_current_thread().build().unwrap();
}

const N: usize = 4096;

async fn read_sample() -> Vec<Question> {
    let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip");
    let archive = ZipFileReader::with_tokio(&mut file)
        .await
        .expect("open zip file reader");
    let mut source_questions = archive.source_questions();
    let source_questions = source_questions.stream();
    pin_mut!(source_questions);

    source_questions
        .converter()
        .convert()
        .take(N)
        .enumerate()
        .map(|(num, mut question)| {
            question.num = 1 + num as u32;
            question
        })
        .collect()
        .await
}

fn read_sample_sync() -> Vec<Question> {
    std::thread::spawn(|| {
        runtime::Builder::new_current_thread()
            .build()
            .unwrap()
            .block_on(read_sample())
    })
    .join()
    .expect("spawn thread")
}

async fn prepare_db_writer<P: AsRef<Path>>(path: P) -> Writer<Question> {
    let opts = WriterOpts {
        compress_lvl: Level::Fastest,
        data_buf_size: 100 * 1024 * 1024,
        out_buf_size: 100 * 1024 * 1024,
        current_buf_size: 10240,
    };

    Writer::<Question>::new(path, opts)
        .await
        .expect("new writer")
}

fn setup_writer<P: AsRef<Path>>(path: P) -> Writer<Question> {
    let rp = path.as_ref().to_str().unwrap().to_string();

    std::thread::spawn(|| {
        runtime::Builder::new_current_thread()
            .build()
            .unwrap()
            .block_on(prepare_db_writer(rp))
    })
    .join()
    .expect("spawn thread")
}

fn setup_reader<P: AsRef<Path>>(path: P) -> Reader<Question> {
    let rp = path.as_ref().to_str().unwrap().to_string();
    std::thread::spawn(|| {
        runtime::Builder::new_current_thread()
            .build()
            .unwrap()
            .block_on(async move { Reader::new(rp).await.expect("new reader") })
    })
    .join()
    .expect("spawn thread")
}

fn questions_async_read(c: &mut Criterion) {
    c.bench_function("questions_async_read", |b| {
        b.to_async(RUNTIME.deref()).iter_batched(
            || setup_reader(NEW_DB_FILENAME),
            |reader| async move {
                reader
                    .stream()
                    .take(N)
                    .for_each(|item| async { drop(item) })
                    .await
            },
            BatchSize::SmallInput,
        )
    });
}

fn questions_async_write(c: &mut Criterion) {
    let dir = tempdir().expect("tempdir");

    c.bench_function("questions_async_write", |b| {
        b.to_async(RUNTIME.deref()).iter_batched(
            || {
                let tmpfile = NamedTempFile::new_in(dir.path())
                    .expect("new tempfile")
                    .into_temp_path();
                let src = read_sample_sync().into_iter();
                let src = futures::stream::iter(src);
                let writer = setup_writer(&tmpfile);
                (src, writer)
            },
            |(mut src, mut writer)| async move {
                writer.load(&mut src).await.unwrap();
                writer.finish().await.unwrap();
            },
            BatchSize::SmallInput,
        )
    });
}

fn config() -> Criterion {
    Criterion::default()
        .sample_size(40)
        .warm_up_time(Duration::from_secs(7))
        .measurement_time(Duration::from_secs(20))
}

criterion_group! {name=benches; config = config(); targets = questions_async_read, questions_async_write}
criterion_main!(benches);