diff --git a/Cargo.lock b/Cargo.lock index 63c6483..b1ecbdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,6 +320,24 @@ dependencies = [ "zip", ] +[[package]] +name = "chgk_ledb_async" +version = "1.1.0" +dependencies = [ + "async_zip", + "bincode", + "chgk_ledb_lib", + "clap 4.3.16", + "futures", + "rand", + "serde", + "serde_derive", + "serde_json", + "tempfile", + "tokio", + "tokio-stream", +] + [[package]] name = "chgk_ledb_lib" version = "1.1.0" @@ -1538,6 +1556,7 @@ dependencies = [ "autocfg", "backtrace", "bytes", + "num_cpus", "pin-project-lite", "tokio-macros", ] @@ -1553,6 +1572,17 @@ dependencies = [ "syn 2.0.26", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.8" diff --git a/Cargo.toml b/Cargo.toml index 49f994d..c1f38e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "app", + "app_async", "lib" ] diff --git a/app_async/Cargo.toml b/app_async/Cargo.toml new file mode 100644 index 0000000..c176811 --- /dev/null +++ b/app_async/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "chgk_ledb_async" +version = "1.1.0" +authors = ["Dmitry "] +edition = "2021" +repository = "https://gitea.b4tman.ru/b4tman/chgk_ledb" +license = "MIT" +description = "Утилита загружающая базу данных ЧГК вопросов из ZIP файла в JSON формате в базу данных." + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chgk_ledb_lib = {path = "../lib", features = ["async", "source_async", "convert_async"]} +serde_json="1.0" +async_zip = { git = "https://github.com/Majored/rs-async-zip", rev = "ff0d985", features = [ + "zstd", + "tokio", + "tokio-fs"] } +tokio = { version = "1", features = [ + "io-util", + "fs", + "rt-multi-thread" +] } +tokio-stream = "0.1" +rand="0.8" +clap = { version = "4.2.7", features = ["derive"] } +futures = "0.3" + +[dev-dependencies] +tempfile = "3.3" +bincode = "^2.0.0-rc.2" +serde="1.0" +serde_derive="1.0" diff --git a/app_async/src/main.rs b/app_async/src/main.rs new file mode 100644 index 0000000..4d9fb22 --- /dev/null +++ b/app_async/src/main.rs @@ -0,0 +1,191 @@ +extern crate serde_json; +use clap::{Parser, Subcommand}; +use futures::{Future, pin_mut}; +use rand::distributions::Uniform; +use rand::seq::IteratorRandom; +use rand::{thread_rng, Rng}; + +use async_zip::tokio::read::seek::ZipFileReader; +use futures::stream::StreamExt; +use std::time::Instant; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +use async_db::WriterOpts; + +use tokio::{fs, io}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use chgk_ledb_lib::async_db; +use chgk_ledb_lib::questions::{Question, QuestionsConverterAsync}; +use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; + +const ZIP_FILENAME: &str = "json.zip"; +const NEW_DB_FILENAME: &str = "db.dat"; + +#[derive(Subcommand, Debug)] +enum Command { + Write, + Print { + #[clap(value_parser, default_value = "0")] + id: u32, + }, + ZipPrint { + #[clap(value_parser, default_value = "0")] + file_num: usize, + #[clap(value_parser, default_value = "0")] + num: usize, + }, +} + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +#[clap(propagate_version = true)] +struct Cli { + #[clap(subcommand)] + command: Command, + #[clap(short, long, action)] + measure: bool, +} + +async fn zip_reader_task(tx: UnboundedSender) { + let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip"); + let archive = ZipFileReader::with_tokio(&mut file) + .await + .expect("open zip file reader"); + let mut source_questions = archive.source_questions(); + let mut source_questions = source_questions.stream(); + pin_mut!(source_questions); + + let mut converter = QuestionsConverterAsync::from(source_questions).convert(); + pin_mut!(converter); + + let mut num: u32 = 0; + while let Some(mut question) = converter.next().await { + question.num = 1 + num; + tx.send(question).expect("send"); + num += 1; + } + + println!("read done"); +} + +async fn print_question_from(get_q: F) +where + F: Future>, +{ + let q = get_q.await.expect("question not found"); + println!("{:#?}", q) +} + +async fn read_from_zip(file_num: usize, mut num: usize) -> Option { + let mut rng = thread_rng(); + let zip_file = fs::File::open(ZIP_FILENAME).await.expect("open zip file"); + let mut zip_reader = io::BufReader::new(zip_file); + let archive = ZipFileReader::with_tokio(&mut zip_reader) + .await + .expect("open zip file reader"); + + let files_count = archive.file().entries().len(); + let file_index = if file_num == 0 { + let files = Uniform::new(0, files_count); + rng.sample(files) + } else { + file_num - 1 + }; + let mut source_questions = archive.source_questions(); + let mut source_questions = source_questions.stream(); + pin_mut!(source_questions); + let src = source_questions.skip(file_index).take(1); + let converter = QuestionsConverterAsync::from(src); + let questions: Vec<_> = converter.convert().collect().await; + if num == 0 { + num = (1..=questions.len()).choose(&mut rng).unwrap(); + } + Some(questions[num - 1].clone()) +} + +// measure and return time elapsed in `fut` in seconds +pub async fn measure(fut: F) -> f64 { + let start = Instant::now(); + fut.await; + let elapsed = start.elapsed(); + (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) +} + +pub async fn measure_and_print(fut: F) { + let m = measure(fut).await; + eprintln!("{}", m); +} + +#[tokio::main] +async fn main() { + let args = Cli::parse(); + + let mut action: Box> = match &args.command { + Command::Write => Box::new(write_db()), + Command::Print { id } => { + let get_question = read_from_db(*id); + Box::new(print_question_from(get_question)) + } + Command::ZipPrint { file_num, num } => { + let get_question = read_from_zip(*file_num, *num); + Box::new(print_question_from(get_question)) + } + }; + + if args.measure { + action = Box::new(measure_and_print(Box::into_pin(action))); + } + + Box::into_pin(action).await; +} + +async fn read_from_db(id: u32) -> Option { + let reader: async_db::Reader = async_db::Reader::new(NEW_DB_FILENAME, 2048) + .await + .expect("new db reader"); + + let len = reader.len(); + + let index = if len == 0 { + let mut rng = thread_rng(); + let questions = Uniform::new(0, len); + rng.sample(questions) + } else { + id as usize - 1 + }; + + println!("{}", &index); // DEBUG + + match reader.get(index).await { + Ok(question) => Some(question), + Err(e) => { + println!("{:#?}", e); // DEBUG + None + }, + } +} +async fn write_db() { + let (tx, rx) = mpsc::unbounded_channel::(); + let (r, _) = tokio::join!(tokio::spawn(zip_reader_task(tx)), db_writer_task(rx),); + r.expect("tokio join"); + println!("all done"); +} +async fn db_writer_task(rx: UnboundedReceiver) { + let writer_opts = WriterOpts::default(); + let mut writer: async_db::Writer = + async_db::Writer::new(NEW_DB_FILENAME, writer_opts) + .await + .expect("new db writer"); + + let mut stream: UnboundedReceiverStream<_> = rx.into(); + + writer + .load(&mut stream) + .await + .unwrap_or_else(|e| panic!("db writer load, {e:#?}")); + + writer.finish().await.expect("db writer finish"); + + println!("write done"); +}