#[macro_use] extern crate criterion; extern crate serde; extern crate serde_derive; extern crate tempfile; use async_compression::Level; use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts}; use chgk_ledb_lib::questions::{Question, QuestionsConverterAsyncForStream}; use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; use futures::pin_mut; use futures::StreamExt; use std::time::Duration; use std::{ops::Deref, path::Path}; use async_zip::tokio::read::seek::ZipFileReader; use criterion::{BatchSize, Criterion}; use lazy_static::lazy_static; use tempfile::{tempdir, NamedTempFile}; use tokio::{fs, runtime}; const ZIP_FILENAME: &str = "../json.zip"; const NEW_DB_FILENAME: &str = "../db.dat"; lazy_static! { static ref RUNTIME: tokio::runtime::Runtime = runtime::Builder::new_current_thread().build().unwrap(); } const N: usize = 4096; async fn read_sample() -> Vec { let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip"); let archive = ZipFileReader::with_tokio(&mut file) .await .expect("open zip file reader"); let mut source_questions = archive.source_questions(); let source_questions = source_questions.stream(); pin_mut!(source_questions); source_questions .converter() .convert() .take(N) .enumerate() .map(|(num, mut question)| { question.num = 1 + num as u32; question }) .collect() .await } fn read_sample_sync() -> Vec { std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(read_sample()) }) .join() .expect("spawn thread") } async fn prepare_db_writer>(path: P) -> Writer { let opts = WriterOpts { compress_lvl: Level::Fastest, data_buf_size: 100 * 1024 * 1024, out_buf_size: 100 * 1024 * 1024, current_buf_size: 10240, }; Writer::::new(path, opts) .await .expect("new writer") } fn setup_writer>(path: P) -> Writer { let rp = path.as_ref().to_str().unwrap().to_string(); std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(prepare_db_writer(rp)) }) .join() .expect("spawn thread") } fn setup_reader>(path: P) -> Reader { let rp = path.as_ref().to_str().unwrap().to_string(); std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async move { Reader::new(rp).await.expect("new reader") }) }) .join() .expect("spawn thread") } fn questions_async_read(c: &mut Criterion) { c.bench_function("questions_async_read", |b| { b.to_async(RUNTIME.deref()).iter_batched( || setup_reader(NEW_DB_FILENAME), |reader| async move { reader .stream() .take(N) .for_each(|item| async { drop(item) }) .await }, BatchSize::SmallInput, ) }); } fn questions_async_write(c: &mut Criterion) { let dir = tempdir().expect("tempdir"); c.bench_function("questions_async_write", |b| { b.to_async(RUNTIME.deref()).iter_batched( || { let tmpfile = NamedTempFile::new_in(dir.path()) .expect("new tempfile") .into_temp_path(); let src = read_sample_sync().into_iter(); let src = futures::stream::iter(src); let writer = setup_writer(&tmpfile); (src, writer) }, |(mut src, mut writer)| async move { writer.load(&mut src).await.unwrap(); writer.finish().await.unwrap(); }, BatchSize::SmallInput, ) }); } fn config() -> Criterion { Criterion::default() .sample_size(40) .warm_up_time(Duration::from_secs(7)) .measurement_time(Duration::from_secs(20)) } criterion_group! {name=benches; config = config(); targets = questions_async_read, questions_async_write} criterion_main!(benches);