chgk_ledb/app_async/benches/async_bench.rs

142 lines
4.1 KiB
Rust

#[macro_use]
extern crate criterion;
extern crate serde;
extern crate serde_derive;
extern crate tempfile;
use async_compression::Level;
use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts};
use futures::StreamExt;
use std::{ops::Deref, path::Path};
use criterion::{BatchSize, Criterion};
use lazy_static::lazy_static;
use tempfile::{tempdir, NamedTempFile};
use serde_derive::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct TestData {
num1: u64,
num2: u64,
test: String,
}
use tokio::runtime;
lazy_static! {
static ref RUNTIME: tokio::runtime::Runtime =
runtime::Builder::new_current_thread().build().unwrap();
}
const N: usize = 4096;
fn gen_data(count: usize) -> impl Iterator<Item = TestData> {
(0..count).map(|i| 143 + i as u64).map(|i| TestData {
num1: i,
num2: (i * 100) ^ 0xDF0E441122334455,
test: "test ---- Test ____".repeat(123 + i as usize % 15),
})
}
async fn prepare_db_writer<P: AsRef<Path>>(path: P) -> Writer<TestData> {
let opts = WriterOpts {
compress_lvl: Level::Fastest,
data_buf_size: 100 * 1024 * 1024,
out_buf_size: 100 * 1024 * 1024,
current_buf_size: 10240,
};
Writer::new(path, opts).await.expect("new writer")
}
fn write_sample<P: AsRef<Path>>(path: P) {
let rp = path.as_ref().to_str().unwrap().to_string();
std::thread::spawn(|| {
runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(async move {
let mut writer = prepare_db_writer(rp).await;
let items_iter = gen_data(N).collect::<Vec<TestData>>().into_iter();
let mut samples_stream = futures::stream::iter(items_iter);
writer.load(&mut samples_stream).await.unwrap();
writer.finish().await.unwrap();
})
})
.join()
.expect("spawn thread");
}
fn setup_writer<P: AsRef<Path>>(path: P) -> Writer<TestData> {
let rp = path.as_ref().to_str().unwrap().to_string();
std::thread::spawn(|| {
runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(prepare_db_writer(rp))
})
.join()
.expect("spawn thread")
}
fn setup_reader<P: AsRef<Path>>(path: P) -> Reader<TestData> {
let rp = path.as_ref().to_str().unwrap().to_string();
std::thread::spawn(|| {
runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(async move { Reader::new(rp).await.expect("new reader") })
})
.join()
.expect("spawn thread")
}
fn async_read(c: &mut Criterion) {
let dir = tempdir().expect("tempdir");
let tmpfile = NamedTempFile::new_in(dir.path())
.expect("new tempfile")
.into_temp_path();
write_sample(&tmpfile);
c.bench_function("async_read", |b| {
b.to_async(RUNTIME.deref()).iter_batched(
|| setup_reader(&tmpfile),
|reader| async move { reader.stream().for_each(|item| async { drop(item) }).await },
BatchSize::SmallInput,
)
});
}
fn async_write(c: &mut Criterion) {
let dir = tempdir().expect("tempdir");
c.bench_function("async_write", |b| {
b.to_async(RUNTIME.deref()).iter_batched(
|| {
let tmpfile = NamedTempFile::new_in(dir.path())
.expect("new tempfile")
.into_temp_path();
let src = gen_data(N).collect::<Vec<TestData>>().into_iter();
let src = futures::stream::iter(src);
let writer = setup_writer(&tmpfile);
(src, writer)
},
|(mut src, mut writer)| async move {
writer.load(&mut src).await.unwrap();
writer.finish().await.unwrap();
},
BatchSize::SmallInput,
)
});
}
fn config() -> Criterion {
Criterion::default().sample_size(20)
}
criterion_group! {name=benches; config = config(); targets = async_read, async_write}
criterion_main!(benches);