#[macro_use] extern crate criterion; extern crate bincode; extern crate serde; extern crate serde_derive; extern crate tempfile; use async_compression::Level; use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts}; use futures::StreamExt; use std::{ops::Deref, path::Path}; use criterion::{BatchSize, Criterion}; use lazy_static::lazy_static; use tempfile::{tempdir, NamedTempFile}; use serde_derive::{Deserialize, Serialize}; #[derive( bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, )] struct TestData { num1: u64, num2: u64, test: String, } use tokio::runtime; lazy_static! { static ref RUNTIME: tokio::runtime::Runtime = runtime::Builder::new_current_thread().build().unwrap(); } const N: usize = 4096; fn gen_data(count: usize) -> impl Iterator { (0..count).map(|i| 143 + i as u64).map(|i| TestData { num1: i, num2: (i * 100) ^ 0xDF0E441122334455, test: "test ---- Test ____".repeat(123 + i as usize % 15), }) } async fn prepare_db_writer>(path: P) -> Writer { let opts = WriterOpts { compress_lvl: Level::Fastest, data_buf_size: 100 * 1024 * 1024, out_buf_size: 100 * 1024 * 1024, current_buf_size: 10240, }; Writer::new(path, opts).await.expect("new writer") } fn write_sample>(path: P) { let rp = path.as_ref().to_str().unwrap().to_string(); std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async move { let mut writer = prepare_db_writer(rp).await; let items_iter = gen_data(N).collect::>().into_iter(); let mut samples_stream = futures::stream::iter(items_iter); writer.load(&mut samples_stream).await.unwrap(); writer.finish().await.unwrap(); }) }) .join() .expect("spawn thread"); } fn setup_writer>(path: P) -> Writer { let rp = path.as_ref().to_str().unwrap().to_string(); std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(prepare_db_writer(rp)) }) .join() .expect("spawn thread") } fn setup_reader>(path: P) -> Reader { let rp = path.as_ref().to_str().unwrap().to_string(); std::thread::spawn(|| { runtime::Builder::new_current_thread() .build() .unwrap() .block_on(async move { Reader::new(rp).await.expect("new reader") }) }) .join() .expect("spawn thread") } fn async_read(c: &mut Criterion) { let dir = tempdir().expect("tempdir"); let tmpfile = NamedTempFile::new_in(dir.path()) .expect("new tempfile") .into_temp_path(); write_sample(&tmpfile); c.bench_function("async_read", |b| { b.to_async(RUNTIME.deref()).iter_batched( || setup_reader(&tmpfile), |reader| async move { reader.stream().for_each(|item| async { drop(item) }).await }, BatchSize::SmallInput, ) }); } fn async_write(c: &mut Criterion) { let dir = tempdir().expect("tempdir"); c.bench_function("async_write", |b| { b.to_async(RUNTIME.deref()).iter_batched( || { let tmpfile = NamedTempFile::new_in(dir.path()) .expect("new tempfile") .into_temp_path(); let src = gen_data(N).collect::>().into_iter(); let src = futures::stream::iter(src); let writer = setup_writer(&tmpfile); (src, writer) }, |(mut src, mut writer)| async move { writer.load(&mut src).await.unwrap(); writer.finish().await.unwrap(); }, BatchSize::SmallInput, ) }); } fn config() -> Criterion { Criterion::default().sample_size(20) } criterion_group! {name=benches; config = config(); targets = async_read, async_write} criterion_main!(benches);