diff --git a/Cargo.lock b/Cargo.lock index 4fecb2e..ec01a84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,7 +314,7 @@ dependencies = [ "bincode", "chgk_ledb_lib", "clap 4.3.16", - "criterion", + "criterion 0.4.0", "rand", "serde", "serde_derive", @@ -327,11 +327,14 @@ dependencies = [ name = "chgk_ledb_async" version = "1.1.0" dependencies = [ + "async-compression 0.4.1", "async_zip", "bincode", "chgk_ledb_lib", "clap 4.3.16", + "criterion 0.5.1", "futures", + "lazy_static", "rand", "serde", "serde_derive", @@ -532,6 +535,34 @@ dependencies = [ "walkdir", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.3.16", + "criterion-plot", + "futures", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + [[package]] name = "criterion-plot" version = "0.5.0" diff --git a/app_async/Cargo.toml b/app_async/Cargo.toml index 22e0fbe..e974234 100644 --- a/app_async/Cargo.toml +++ b/app_async/Cargo.toml @@ -9,6 +9,10 @@ description = "Утилита загружающая базу данных ЧГ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bench]] +name = "async_bench" +harness = false + [dependencies] chgk_ledb_lib = {path = "../lib", features = ["async", "convert_async"]} serde_json="1.0" @@ -27,7 +31,11 @@ clap = { version = "4.2.7", features = ["derive"] } futures = "0.3" [dev-dependencies] +criterion = { version = "0.5.1", features = ["async_tokio"]} tempfile = "3.3" bincode = "^2.0.0-rc.2" serde="1.0" serde_derive="1.0" +futures = "0.3" +async-compression = { version = "0.4.1", default-features = false } +lazy_static = "1.4.0" diff --git a/app_async/benches/async_bench.rs b/app_async/benches/async_bench.rs new file mode 100644 index 0000000..0442a52 --- /dev/null +++ b/app_async/benches/async_bench.rs @@ -0,0 +1,153 @@ +#[macro_use] +extern crate criterion; +extern crate bincode; +extern crate serde; +extern crate serde_derive; +extern crate tempfile; + +use async_compression::Level; +use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts}; +use futures::StreamExt; +use std::{ops::Deref, path::Path}; + +use criterion::{BatchSize, Criterion}; +use lazy_static::lazy_static; +use tempfile::{tempdir, NamedTempFile}; + +use serde_derive::{Deserialize, Serialize}; + +#[derive( + bincode::Encode, + bincode::Decode, + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, +)] +struct TestData { + num1: u64, + num2: u64, + test: String, +} + +use tokio::runtime; + +lazy_static! { + static ref RUNTIME: tokio::runtime::Runtime = + runtime::Builder::new_current_thread().build().unwrap(); +} + +const N: usize = 4096; + +fn gen_data(count: usize) -> impl Iterator { + (0..count).map(|i| 143 + i as u64).map(|i| TestData { + num1: i, + num2: (i * 100) ^ 0xDF0E441122334455, + test: "test ---- Test ____".repeat(123 + i as usize % 15), + }) +} + +async fn prepare_db_writer>(path: P) -> Writer { + let opts = WriterOpts { + compress_lvl: Level::Fastest, + data_buf_size: 100 * 1024 * 1024, + out_buf_size: 100 * 1024 * 1024, + current_buf_size: 10240, + }; + + Writer::new(path, opts).await.expect("new writer") +} + +fn write_sample>(path: P) { + let rp = path.as_ref().to_str().unwrap().to_string(); + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(async move { + let mut writer = prepare_db_writer(rp).await; + + let items_iter = gen_data(N).collect::>().into_iter(); + let mut samples_stream = futures::stream::iter(items_iter); + writer.load(&mut samples_stream).await.unwrap(); + writer.finish().await.unwrap(); + }) + }) + .join() + .expect("spawn thread"); +} + +fn setup_writer>(path: P) -> Writer { + let rp = path.as_ref().to_str().unwrap().to_string(); + + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(prepare_db_writer(rp)) + }) + .join() + .expect("spawn thread") +} + +fn setup_reader>(path: P) -> Reader { + let rp = path.as_ref().to_str().unwrap().to_string(); + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(async move { Reader::new(rp).await.expect("new reader") }) + }) + .join() + .expect("spawn thread") +} + +fn async_read(c: &mut Criterion) { + let dir = tempdir().expect("tempdir"); + let tmpfile = NamedTempFile::new_in(dir.path()) + .expect("new tempfile") + .into_temp_path(); + write_sample(&tmpfile); + + c.bench_function("async_read", |b| { + b.to_async(RUNTIME.deref()).iter_batched( + || setup_reader(&tmpfile), + |reader| async move { reader.stream().for_each(|item| async { drop(item) }).await }, + BatchSize::SmallInput, + ) + }); +} + +fn async_write(c: &mut Criterion) { + let dir = tempdir().expect("tempdir"); + + c.bench_function("async_write", |b| { + b.to_async(RUNTIME.deref()).iter_batched( + || { + let tmpfile = NamedTempFile::new_in(dir.path()) + .expect("new tempfile") + .into_temp_path(); + let src = gen_data(N).collect::>().into_iter(); + let src = futures::stream::iter(src); + let writer = setup_writer(&tmpfile); + (src, writer) + }, + |(mut src, mut writer)| async move { + writer.load(&mut src).await.unwrap(); + writer.finish().await.unwrap(); + }, + BatchSize::SmallInput, + ) + }); +} + +fn config() -> Criterion { + Criterion::default().sample_size(20) +} + +criterion_group! {name=benches; config = config(); targets = async_read, async_write} +criterion_main!(benches);