From e4001ee69fc8831de6498926b25a2d5ad470ef64 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 23 Aug 2023 23:50:31 +0300 Subject: [PATCH 1/3] questions: remove "skip_serializing_if" to fix ser/de to postcard err "Hit the end of buffer, expected more data" more info: https://github.com/apache/arrow-rs/issues/3082 --- lib/src/questions.rs | 224 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 179 insertions(+), 45 deletions(-) diff --git a/lib/src/questions.rs b/lib/src/questions.rs index 29a0745..3f5ee68 100644 --- a/lib/src/questions.rs +++ b/lib/src/questions.rs @@ -2,83 +2,71 @@ use serde_derive::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct BatchInfo { - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub filename: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub description: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub author: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub comment: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub url: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub date: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub processed_by: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub redacted_by: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub copyright: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub theme: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub kind: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub source: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub rating: String, } #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct Question { - #[serde(default, skip_serializing_if = "u32_is_zero")] + #[serde(default)] pub num: u32, pub id: String, - pub description: String, pub answer: String, - - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub author: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub comment: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub comment1: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub tour: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub url: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub date: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub processed_by: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub redacted_by: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub copyright: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub theme: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub kind: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub source: String, - #[serde(default, skip_serializing_if = "String::is_empty")] + #[serde(default)] pub rating: String, - #[serde(default, skip_serializing_if = "BatchInfo::is_default")] + #[serde(default)] pub batch_info: BatchInfo, } -fn u32_is_zero(num: &u32) -> bool { - *num == 0 -} - -impl BatchInfo { - pub fn is_default(&self) -> bool { - *self == BatchInfo::default() - } -} - #[cfg(any(feature = "convert", feature = "convert_async"))] pub mod convert_common { use super::{BatchInfo, Question}; @@ -170,20 +158,68 @@ pub mod convert { let converted: Vec<_> = source.convert().collect(); assert_yaml_snapshot!(converted, @r#" --- - - id: Вопрос 1 + - num: 0 + id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: filename: test.json description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 - - id: Вопрос 2 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + - num: 0 + id: Вопрос 2 description: Зимой и летом одним цветом answer: ёлка + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: filename: test.json description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" "#); } @@ -285,20 +321,68 @@ pub mod convert_async { let converted: Vec<_> = converter.collect().await; assert_yaml_snapshot!(converted, @r#" --- - - id: Вопрос 1 + - num: 0 + id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: filename: test.json description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 - - id: Вопрос 2 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + - num: 0 + id: Вопрос 2 description: Зимой и летом одним цветом answer: ёлка + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: filename: test.json description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" "#); } @@ -358,13 +442,38 @@ mod test { fn test_question_ser() { assert_yaml_snapshot!(sample_question(), @r#" --- + num: 0 id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: + filename: "" description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 - + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + "#); } #[test] @@ -382,12 +491,37 @@ mod test { assert_yaml_snapshot!(question_from_json.unwrap(), @r#" --- + num: 0 id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" batch_info: + filename: "" description: Тестовый + author: "" + comment: "" + url: "" date: 00-000-2000 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" "#); } From bd67e3ee857629aee5266894fd160556b78b5f0f Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 24 Aug 2023 09:27:42 +0300 Subject: [PATCH 2/3] question: split struct for binary/text serialize --- app/src/main.rs | 2 +- app_async/src/main.rs | 2 +- lib/src/questions.rs | 305 ++++++++++++++++++++++++++++++------------ 3 files changed, 224 insertions(+), 85 deletions(-) diff --git a/app/src/main.rs b/app/src/main.rs index fbcb2e7..561a6bd 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -10,7 +10,7 @@ use chgk_ledb_lib::db; use chgk_ledb_lib::questions; use chgk_ledb_lib::source; -use crate::questions::{Question, QuestionsConverter}; +use crate::questions::{binary::Question, QuestionsConverter}; use crate::source::ReadSourceQuestionsBatches; const ZIP_FILENAME: &str = "json.zip"; diff --git a/app_async/src/main.rs b/app_async/src/main.rs index 88e1256..996ee02 100644 --- a/app_async/src/main.rs +++ b/app_async/src/main.rs @@ -16,7 +16,7 @@ use tokio::{fs, io}; use tokio_stream::wrappers::UnboundedReceiverStream; use chgk_ledb_lib::async_db; -use chgk_ledb_lib::questions::Question; +use chgk_ledb_lib::questions::binary::Question; use chgk_ledb_lib::questions::QuestionsConverterAsyncForStream; use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; diff --git a/lib/src/questions.rs b/lib/src/questions.rs index 3f5ee68..9039d7a 100644 --- a/lib/src/questions.rs +++ b/lib/src/questions.rs @@ -2,74 +2,264 @@ use serde_derive::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct BatchInfo { - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub filename: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub description: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub url: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub kind: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub rating: String, } #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct Question { - #[serde(default)] + #[serde(default, skip_serializing_if = "u32_is_zero")] pub num: u32, pub id: String, + pub description: String, pub answer: String, - #[serde(default)] + + #[serde(default, skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment1: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub tour: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub url: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub kind: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub rating: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "BatchInfo::is_default")] pub batch_info: BatchInfo, } +fn u32_is_zero(num: &u32) -> bool { + *num == 0 +} + +impl BatchInfo { + pub fn is_default(&self) -> bool { + *self == BatchInfo::default() + } +} + +pub mod binary { + use serde_derive::{Deserialize, Serialize}; + #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] + pub struct BatchInfo { + #[serde(default)] + pub filename: String, + #[serde(default)] + pub description: String, + #[serde(default)] + pub author: String, + #[serde(default)] + pub comment: String, + #[serde(default)] + pub url: String, + #[serde(default)] + pub date: String, + #[serde(default)] + pub processed_by: String, + #[serde(default)] + pub redacted_by: String, + #[serde(default)] + pub copyright: String, + #[serde(default)] + pub theme: String, + #[serde(default)] + pub kind: String, + #[serde(default)] + pub source: String, + #[serde(default)] + pub rating: String, + } + + #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] + pub struct Question { + #[serde(default)] + pub num: u32, + pub id: String, + pub description: String, + pub answer: String, + #[serde(default)] + pub author: String, + #[serde(default)] + pub comment: String, + #[serde(default)] + pub comment1: String, + #[serde(default)] + pub tour: String, + #[serde(default)] + pub url: String, + #[serde(default)] + pub date: String, + #[serde(default)] + pub processed_by: String, + #[serde(default)] + pub redacted_by: String, + #[serde(default)] + pub copyright: String, + #[serde(default)] + pub theme: String, + #[serde(default)] + pub kind: String, + #[serde(default)] + pub source: String, + #[serde(default)] + pub rating: String, + #[serde(default)] + pub batch_info: BatchInfo, + } + + #[cfg(test)] + mod test { + use super::*; + use insta::assert_yaml_snapshot; + use serde_json::json; + pub fn sample_question() -> Question { + Question { + id: "Вопрос 1".into(), + description: "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2".into(), + answer: "42".into(), + batch_info: BatchInfo { + description: "Тестовый".into(), + date: "00-000-2000".into(), + ..Default::default() + }, + ..Default::default() + } + } + + #[test] + fn test_question_ser() { + assert_yaml_snapshot!(sample_question(), @r#" + --- + num: 0 + id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + batch_info: + filename: "" + description: Тестовый + author: "" + comment: "" + url: "" + date: 00-000-2000 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + + "#); + } + #[test] + fn test_question_de() { + let question_from_json: Result = serde_json::from_value(json!({ + "id": "Вопрос 1", + "description": "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2", + "answer": "42", + "batch_info": { + "description": "Тестовый", + "date": "00-000-2000" + } + })); + assert!(question_from_json.is_ok()); + + assert_yaml_snapshot!(question_from_json.unwrap(), @r#" + --- + num: 0 + id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + author: "" + comment: "" + comment1: "" + tour: "" + url: "" + date: "" + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + batch_info: + filename: "" + description: Тестовый + author: "" + comment: "" + url: "" + date: 00-000-2000 + processed_by: "" + redacted_by: "" + copyright: "" + theme: "" + kind: "" + source: "" + rating: "" + + "#); + } + } +} + #[cfg(any(feature = "convert", feature = "convert_async"))] pub mod convert_common { - use super::{BatchInfo, Question}; + use super::binary::{BatchInfo, Question}; use crate::source::{SourceQuestion, SourceQuestionsBatch}; macro_rules! make { @@ -117,7 +307,7 @@ pub mod convert_common { #[cfg(feature = "convert")] pub mod convert { - use super::Question; + use super::binary::Question; use crate::source::SourceQuestionsBatch; pub trait QuestionsConverter { @@ -234,7 +424,7 @@ pub mod convert_async { use futures_core::stream::Stream; use futures_util::StreamExt; - use super::Question; + use super::binary::Question; use crate::source::SourceQuestionsBatch; pub struct QuestionsConverterAsync @@ -396,7 +586,6 @@ mod test { use super::*; use insta::assert_yaml_snapshot; use serde_json::json; - #[cfg(any(feature = "convert", feature = "convert_async"))] pub mod convert_common { use crate::source::{SourceQuestion, SourceQuestionsBatch}; @@ -442,38 +631,13 @@ mod test { fn test_question_ser() { assert_yaml_snapshot!(sample_question(), @r#" --- - num: 0 id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" - author: "" - comment: "" - comment1: "" - tour: "" - url: "" - date: "" - processed_by: "" - redacted_by: "" - copyright: "" - theme: "" - kind: "" - source: "" - rating: "" batch_info: - filename: "" description: Тестовый - author: "" - comment: "" - url: "" date: 00-000-2000 - processed_by: "" - redacted_by: "" - copyright: "" - theme: "" - kind: "" - source: "" - rating: "" - + "#); } #[test] @@ -491,37 +655,12 @@ mod test { assert_yaml_snapshot!(question_from_json.unwrap(), @r#" --- - num: 0 id: Вопрос 1 description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 answer: "42" - author: "" - comment: "" - comment1: "" - tour: "" - url: "" - date: "" - processed_by: "" - redacted_by: "" - copyright: "" - theme: "" - kind: "" - source: "" - rating: "" batch_info: - filename: "" description: Тестовый - author: "" - comment: "" - url: "" date: 00-000-2000 - processed_by: "" - redacted_by: "" - copyright: "" - theme: "" - kind: "" - source: "" - rating: "" "#); } From 1b88db07be731233493f58e0c83750650282536e Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sat, 26 Aug 2023 13:42:25 +0300 Subject: [PATCH 3/3] add questions benches --- app/Cargo.toml | 4 + app/benches/questions_bench.rs | 101 ++++++++++++++ app_async/Cargo.toml | 4 + app_async/benches/questions_async_bench.rs | 152 +++++++++++++++++++++ 4 files changed, 261 insertions(+) create mode 100644 app/benches/questions_bench.rs create mode 100644 app_async/benches/questions_async_bench.rs diff --git a/app/Cargo.toml b/app/Cargo.toml index ef32ab3..bc9edee 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -13,6 +13,10 @@ description = "Утилита загружающая базу данных ЧГ name = "db_bench" harness = false +[[bench]] +name = "questions_bench" +harness = false + [dependencies] chgk_ledb_lib = {path = "../lib", features = ["sync", "source", "convert"]} serde_json="1.0" diff --git a/app/benches/questions_bench.rs b/app/benches/questions_bench.rs new file mode 100644 index 0000000..1b08f86 --- /dev/null +++ b/app/benches/questions_bench.rs @@ -0,0 +1,101 @@ +#[macro_use] +extern crate criterion; +extern crate bincode; +extern crate serde; +extern crate serde_derive; +extern crate serde_json; +extern crate tempfile; + +use chgk_ledb_lib::db::{Reader, Writer, WriterOpts}; +use chgk_ledb_lib::questions::{Question, QuestionsConverter}; +use chgk_ledb_lib::source::ReadSourceQuestionsBatches; + +use std::path::Path; +use std::time::Duration; +use std::{fs, io}; + +use criterion::{BatchSize, Criterion}; +use tempfile::{tempdir, NamedTempFile}; + +const ZIP_FILENAME: &str = "../json.zip"; +const NEW_DB_FILENAME: &str = "../db.dat"; + +const N: usize = 4096; + +fn read_sample() -> Vec { + let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); + let zip_reader = io::BufReader::new(zip_file); + let archive = zip::ZipArchive::new(zip_reader).unwrap(); + let mut source_questions = archive.source_questions(); + + source_questions + .convert() + .take(N) + .enumerate() + .map(|(num, mut question)| { + question.num = 1 + num as u32; + question + }) + .collect() +} + +fn prepare_db_writer>(path: P) -> Writer { + let opts = WriterOpts { + compress_lvl: 1, + data_buf_size: 100 * 1024 * 1024, + out_buf_size: 100 * 1024 * 1024, + current_buf_size: 10240, + }; + + Writer::new(path, opts).expect("new writer") +} + +fn questions_read(c: &mut Criterion) { + c.bench_function("questions_read", |b| { + b.iter_batched( + || { + let reader: Reader = + Reader::new(NEW_DB_FILENAME, 4096).expect("new reader"); + reader.into_iter().take(N) + }, + |reader| { + for item in reader { + drop(item); + } + }, + BatchSize::SmallInput, + ) + }); +} + +fn questions_write(c: &mut Criterion) { + let dir = tempdir().expect("tempdir"); + + c.bench_function("questions_write", |b| { + b.iter_batched( + || { + let tmpfile = NamedTempFile::new_in(dir.path()) + .expect("new tempfile") + .into_temp_path(); + let src = read_sample().into_iter(); + let writer = prepare_db_writer(&tmpfile); + (src, writer) + }, + |(mut src, mut writer)| { + writer.load(&mut src).unwrap(); + writer.finish().unwrap(); + }, + BatchSize::SmallInput, + ) + }); +} + +fn config() -> Criterion { + Criterion::default() + .sample_size(40) + .warm_up_time(Duration::from_secs(7)) + .measurement_time(Duration::from_secs(20)) +} + +criterion_group! {name=benches; config = config(); targets = questions_read, questions_write} +criterion_main!(benches); diff --git a/app_async/Cargo.toml b/app_async/Cargo.toml index e974234..e46f6e6 100644 --- a/app_async/Cargo.toml +++ b/app_async/Cargo.toml @@ -13,6 +13,10 @@ description = "Утилита загружающая базу данных ЧГ name = "async_bench" harness = false +[[bench]] +name = "questions_async_bench" +harness = false + [dependencies] chgk_ledb_lib = {path = "../lib", features = ["async", "convert_async"]} serde_json="1.0" diff --git a/app_async/benches/questions_async_bench.rs b/app_async/benches/questions_async_bench.rs new file mode 100644 index 0000000..950a79a --- /dev/null +++ b/app_async/benches/questions_async_bench.rs @@ -0,0 +1,152 @@ +#[macro_use] +extern crate criterion; +extern crate bincode; +extern crate serde; +extern crate serde_derive; +extern crate tempfile; + +use async_compression::Level; +use chgk_ledb_lib::async_db::{Reader, Writer, WriterOpts}; +use chgk_ledb_lib::questions::{Question, QuestionsConverterAsyncForStream}; +use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; +use futures::pin_mut; +use futures::StreamExt; +use std::time::Duration; +use std::{ops::Deref, path::Path}; + +use async_zip::tokio::read::seek::ZipFileReader; + +use criterion::{BatchSize, Criterion}; +use lazy_static::lazy_static; +use tempfile::{tempdir, NamedTempFile}; +use tokio::{fs, runtime}; + +const ZIP_FILENAME: &str = "../json.zip"; +const NEW_DB_FILENAME: &str = "../db.dat"; + +lazy_static! { + static ref RUNTIME: tokio::runtime::Runtime = + runtime::Builder::new_current_thread().build().unwrap(); +} + +const N: usize = 4096; + +async fn read_sample() -> Vec { + let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip"); + let archive = ZipFileReader::with_tokio(&mut file) + .await + .expect("open zip file reader"); + let mut source_questions = archive.source_questions(); + let source_questions = source_questions.stream(); + pin_mut!(source_questions); + + source_questions + .converter() + .convert() + .take(N) + .enumerate() + .map(|(num, mut question)| { + question.num = 1 + num as u32; + question + }) + .collect() + .await +} + +fn read_sample_sync() -> Vec { + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(read_sample()) + }) + .join() + .expect("spawn thread") +} + +async fn prepare_db_writer>(path: P) -> Writer { + let opts = WriterOpts { + compress_lvl: Level::Fastest, + data_buf_size: 100 * 1024 * 1024, + out_buf_size: 100 * 1024 * 1024, + current_buf_size: 10240, + }; + + Writer::::new(path, opts) + .await + .expect("new writer") +} + +fn setup_writer>(path: P) -> Writer { + let rp = path.as_ref().to_str().unwrap().to_string(); + + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(prepare_db_writer(rp)) + }) + .join() + .expect("spawn thread") +} + +fn setup_reader>(path: P) -> Reader { + let rp = path.as_ref().to_str().unwrap().to_string(); + std::thread::spawn(|| { + runtime::Builder::new_current_thread() + .build() + .unwrap() + .block_on(async move { Reader::new(rp).await.expect("new reader") }) + }) + .join() + .expect("spawn thread") +} + +fn questions_async_read(c: &mut Criterion) { + c.bench_function("questions_async_read", |b| { + b.to_async(RUNTIME.deref()).iter_batched( + || setup_reader(NEW_DB_FILENAME), + |reader| async move { + reader + .stream() + .take(N) + .for_each(|item| async { drop(item) }) + .await + }, + BatchSize::SmallInput, + ) + }); +} + +fn questions_async_write(c: &mut Criterion) { + let dir = tempdir().expect("tempdir"); + + c.bench_function("questions_async_write", |b| { + b.to_async(RUNTIME.deref()).iter_batched( + || { + let tmpfile = NamedTempFile::new_in(dir.path()) + .expect("new tempfile") + .into_temp_path(); + let src = read_sample_sync().into_iter(); + let src = futures::stream::iter(src); + let writer = setup_writer(&tmpfile); + (src, writer) + }, + |(mut src, mut writer)| async move { + writer.load(&mut src).await.unwrap(); + writer.finish().await.unwrap(); + }, + BatchSize::SmallInput, + ) + }); +} + +fn config() -> Criterion { + Criterion::default() + .sample_size(40) + .warm_up_time(Duration::from_secs(7)) + .measurement_time(Duration::from_secs(20)) +} + +criterion_group! {name=benches; config = config(); targets = questions_async_read, questions_async_write} +criterion_main!(benches);