2023-08-23 10:30:51 +00:00
|
|
|
#[macro_use]
|
|
|
|
extern crate criterion;
|
|
|
|
extern crate serde;
|
|
|
|
extern crate serde_derive;
|
|
|
|
extern crate tempfile;
|
|
|
|
|
|
|
|
use async_compression::Level;
|
|
|
|
use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts};
|
|
|
|
use futures::StreamExt;
|
|
|
|
use std::{ops::Deref, path::Path};
|
|
|
|
|
|
|
|
use criterion::{BatchSize, Criterion};
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
use tempfile::{tempdir, NamedTempFile};
|
|
|
|
|
|
|
|
use serde_derive::{Deserialize, Serialize};
|
|
|
|
|
2023-08-23 19:40:41 +00:00
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
2023-08-23 10:30:51 +00:00
|
|
|
struct TestData {
|
|
|
|
num1: u64,
|
|
|
|
num2: u64,
|
|
|
|
test: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
use tokio::runtime;
|
|
|
|
|
|
|
|
lazy_static! {
|
|
|
|
static ref RUNTIME: tokio::runtime::Runtime =
|
|
|
|
runtime::Builder::new_current_thread().build().unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
const N: usize = 4096;
|
|
|
|
|
|
|
|
fn gen_data(count: usize) -> impl Iterator<Item = TestData> {
|
|
|
|
(0..count).map(|i| 143 + i as u64).map(|i| TestData {
|
|
|
|
num1: i,
|
|
|
|
num2: (i * 100) ^ 0xDF0E441122334455,
|
|
|
|
test: "test ---- Test ____".repeat(123 + i as usize % 15),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn prepare_db_writer<P: AsRef<Path>>(path: P) -> Writer<TestData> {
|
|
|
|
let opts = WriterOpts {
|
|
|
|
compress_lvl: Level::Fastest,
|
|
|
|
data_buf_size: 100 * 1024 * 1024,
|
|
|
|
out_buf_size: 100 * 1024 * 1024,
|
|
|
|
current_buf_size: 10240,
|
|
|
|
};
|
|
|
|
|
|
|
|
Writer::new(path, opts).await.expect("new writer")
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_sample<P: AsRef<Path>>(path: P) {
|
|
|
|
let rp = path.as_ref().to_str().unwrap().to_string();
|
|
|
|
std::thread::spawn(|| {
|
|
|
|
runtime::Builder::new_current_thread()
|
|
|
|
.build()
|
|
|
|
.unwrap()
|
|
|
|
.block_on(async move {
|
|
|
|
let mut writer = prepare_db_writer(rp).await;
|
|
|
|
|
|
|
|
let items_iter = gen_data(N).collect::<Vec<TestData>>().into_iter();
|
|
|
|
let mut samples_stream = futures::stream::iter(items_iter);
|
|
|
|
writer.load(&mut samples_stream).await.unwrap();
|
|
|
|
writer.finish().await.unwrap();
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.join()
|
|
|
|
.expect("spawn thread");
|
|
|
|
}
|
|
|
|
|
|
|
|
fn setup_writer<P: AsRef<Path>>(path: P) -> Writer<TestData> {
|
|
|
|
let rp = path.as_ref().to_str().unwrap().to_string();
|
|
|
|
|
|
|
|
std::thread::spawn(|| {
|
|
|
|
runtime::Builder::new_current_thread()
|
|
|
|
.build()
|
|
|
|
.unwrap()
|
|
|
|
.block_on(prepare_db_writer(rp))
|
|
|
|
})
|
|
|
|
.join()
|
|
|
|
.expect("spawn thread")
|
|
|
|
}
|
|
|
|
|
|
|
|
fn setup_reader<P: AsRef<Path>>(path: P) -> Reader<TestData> {
|
|
|
|
let rp = path.as_ref().to_str().unwrap().to_string();
|
|
|
|
std::thread::spawn(|| {
|
|
|
|
runtime::Builder::new_current_thread()
|
|
|
|
.build()
|
|
|
|
.unwrap()
|
|
|
|
.block_on(async move { Reader::new(rp).await.expect("new reader") })
|
|
|
|
})
|
|
|
|
.join()
|
|
|
|
.expect("spawn thread")
|
|
|
|
}
|
|
|
|
|
|
|
|
fn async_read(c: &mut Criterion) {
|
|
|
|
let dir = tempdir().expect("tempdir");
|
|
|
|
let tmpfile = NamedTempFile::new_in(dir.path())
|
|
|
|
.expect("new tempfile")
|
|
|
|
.into_temp_path();
|
|
|
|
write_sample(&tmpfile);
|
|
|
|
|
|
|
|
c.bench_function("async_read", |b| {
|
|
|
|
b.to_async(RUNTIME.deref()).iter_batched(
|
|
|
|
|| setup_reader(&tmpfile),
|
|
|
|
|reader| async move { reader.stream().for_each(|item| async { drop(item) }).await },
|
|
|
|
BatchSize::SmallInput,
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
fn async_write(c: &mut Criterion) {
|
|
|
|
let dir = tempdir().expect("tempdir");
|
|
|
|
|
|
|
|
c.bench_function("async_write", |b| {
|
|
|
|
b.to_async(RUNTIME.deref()).iter_batched(
|
|
|
|
|| {
|
|
|
|
let tmpfile = NamedTempFile::new_in(dir.path())
|
|
|
|
.expect("new tempfile")
|
|
|
|
.into_temp_path();
|
|
|
|
let src = gen_data(N).collect::<Vec<TestData>>().into_iter();
|
|
|
|
let src = futures::stream::iter(src);
|
|
|
|
let writer = setup_writer(&tmpfile);
|
|
|
|
(src, writer)
|
|
|
|
},
|
|
|
|
|(mut src, mut writer)| async move {
|
|
|
|
writer.load(&mut src).await.unwrap();
|
|
|
|
writer.finish().await.unwrap();
|
|
|
|
},
|
|
|
|
BatchSize::SmallInput,
|
|
|
|
)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
fn config() -> Criterion {
|
|
|
|
Criterion::default().sample_size(20)
|
|
|
|
}
|
|
|
|
|
|
|
|
criterion_group! {name=benches; config = config(); targets = async_read, async_write}
|
|
|
|
criterion_main!(benches);
|