From c4b68cc7279d1fe1091ff4588601af1e4d01465b Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 25 Aug 2023 15:28:19 +0300 Subject: [PATCH] apps: handle err's + ord fn's --- app/src/main.rs | 157 +++++++++++++++++-------------- app_async/src/main.rs | 211 +++++++++++++++++++++--------------------- 2 files changed, 194 insertions(+), 174 deletions(-) diff --git a/app/src/main.rs b/app/src/main.rs index fbcb2e7..7027986 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -12,6 +12,7 @@ use chgk_ledb_lib::source; use crate::questions::{Question, QuestionsConverter}; use crate::source::ReadSourceQuestionsBatches; +use chgk_ledb_lib::util::ErrorToString; const ZIP_FILENAME: &str = "json.zip"; const NEW_DB_FILENAME: &str = "db.dat"; @@ -41,70 +42,6 @@ struct Cli { measure: bool, } -fn zip_reader_task(tx: mpsc::Sender) { - let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); - let zip_reader = io::BufReader::new(zip_file); - let archive = zip::ZipArchive::new(zip_reader).unwrap(); - let mut source_questions = archive.source_questions(); - - let questions = source_questions - .convert() - .enumerate() - .map(|(num, mut question)| { - question.num = 1 + num as u32; - question - }); - for question in questions { - let res = tx.send(question); - if res.is_err() { - break; - } - } - println!("read done"); -} - -fn print_question_from(get_q: F) -where - F: FnOnce() -> Option, -{ - let q = get_q().expect("question not found"); - println!("{:#?}", q) -} - -fn read_from_zip(file_num: usize, mut num: usize) -> Option { - let mut rng = rand::thread_rng(); - let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); - let zip_reader = io::BufReader::new(zip_file); - let archive = zip::ZipArchive::new(zip_reader).unwrap(); - - let mut source_questions = archive.source_questions(); - let (filename, batch) = if file_num == 0 { - source_questions.choose(&mut rng).unwrap() - } else { - source_questions.nth(file_num - 1).unwrap() - }; - let mut batch = batch.unwrap(); - batch.filename = filename; - let questions: Vec = batch.into(); - if num == 0 { - num = (1..=questions.len()).choose(&mut rng).unwrap(); - } - Some(questions[num - 1].clone()) -} - -// measure and return time elapsed in `func` in seconds -pub fn measure(func: F) -> f64 { - let start = Instant::now(); - func(); - let elapsed = start.elapsed(); - (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) -} - -pub fn measure_and_print(func: F) { - let m = measure(func); - eprintln!("{}", m); -} - fn main() { let args = Cli::parse(); @@ -127,20 +64,75 @@ fn main() { action(); } -fn read_from_db(id: u32) -> Option { - let reader: db::Reader = - db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader"); +// measure and return time elapsed in `func` in seconds +pub fn measure(func: F) -> f64 { + let start = Instant::now(); + func(); + let elapsed = start.elapsed(); + (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) +} +pub fn measure_and_print(func: F) { + let m = measure(func); + eprintln!("{}", m); +} + +fn print_question_from(get_q: F) +where + F: FnOnce() -> Result, +{ + let q = get_q().expect("question not found"); + println!("{:#?}", q) +} + +fn read_from_zip(file_num: usize, mut num: usize) -> Result { + let mut rng = rand::thread_rng(); + let zip_file = fs::File::open(ZIP_FILENAME).str_err()?; + let zip_reader = io::BufReader::new(zip_file); + let archive = zip::ZipArchive::new(zip_reader).str_err()?; + + let mut source_questions = archive.source_questions(); + let (filename, batch) = if file_num == 0 { + source_questions + .choose(&mut rng) + .ok_or("rand choose".to_string())? + } else { + source_questions + .nth(file_num - 1) + .ok_or(format!("file nth #{file_num} => None"))? + }; + let mut batch = batch.map_err(|e| format!("get batch from file #{file_num} => {e}"))?; + batch.filename = filename; + let questions: Vec = batch.into(); + if num == 0 { + num = (1..=questions.len()) + .choose(&mut rng) + .ok_or("rand choose".to_string())?; + } + Ok(questions[num - 1].clone()) +} + +fn read_from_db(id: u32) -> Result { + let reader: db::Reader = db::Reader::new(NEW_DB_FILENAME, 2048)?; + + let len = reader.len(); let mut questions = reader.into_iter(); - match id { + let question = match id { 0 => { let mut rng = rand::thread_rng(); - questions.choose(&mut rng) + questions + .choose(&mut rng) + .ok_or(format!("rand choose, len = {len}"))? } - _ => questions.nth((id - 1) as usize), - } + _ => questions + .nth((id - 1) as usize) + .ok_or(format!("get nth #{id} => None"))?, + }; + + Ok(question) } + fn write_db() { let (tx, rx) = mpsc::channel::(); [ @@ -151,6 +143,29 @@ fn write_db() { .for_each(|handle| handle.join().expect("thread panic")); println!("all done"); } + +fn zip_reader_task(tx: mpsc::Sender) { + let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); + let zip_reader = io::BufReader::new(zip_file); + let archive = zip::ZipArchive::new(zip_reader).unwrap(); + let mut source_questions = archive.source_questions(); + + let questions = source_questions + .convert() + .enumerate() + .map(|(num, mut question)| { + question.num = 1 + num as u32; + question + }); + for question in questions { + let res = tx.send(question); + if res.is_err() { + break; + } + } + println!("read done"); +} + fn db_writer_task(rx: mpsc::Receiver) { let writer_opts = db::WriterOpts::default(); let mut writer: db::Writer = diff --git a/app_async/src/main.rs b/app_async/src/main.rs index 6b21fb8..fc8d9b3 100644 --- a/app_async/src/main.rs +++ b/app_async/src/main.rs @@ -19,6 +19,7 @@ use chgk_ledb_lib::async_db; use chgk_ledb_lib::questions::Question; use chgk_ledb_lib::questions::QuestionsConverterAsyncForStream; use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; +use chgk_ledb_lib::util::ErrorToString; const ZIP_FILENAME: &str = "json.zip"; const NEW_DB_FILENAME: &str = "db.dat"; @@ -48,6 +49,113 @@ struct Cli { measure: bool, } +#[tokio::main] +async fn main() { + let args = Cli::parse(); + + let mut action: Box> = match &args.command { + Command::Write => Box::new(write_db()), + Command::Print { id } => { + let get_question = read_from_db(*id); + Box::new(print_question_from(get_question)) + } + Command::ZipPrint { file_num, num } => { + let get_question = read_from_zip(*file_num, *num); + Box::new(print_question_from(get_question)) + } + }; + + if args.measure { + action = Box::new(measure_and_print(Box::into_pin(action))); + } + + Box::into_pin(action).await; +} + +// measure and return time elapsed in `fut` in seconds +pub async fn measure(fut: F) -> f64 { + let start = Instant::now(); + fut.await; + let elapsed = start.elapsed(); + (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) +} + +pub async fn measure_and_print(fut: F) { + let m = measure(fut).await; + eprintln!("{}", m); +} + +async fn print_question_from(get_q: F) +where + F: Future>, +{ + let q = get_q.await.expect("question not found"); + println!("{:#?}", q) +} + +async fn read_from_zip(file_num: usize, mut num: usize) -> Result { + let mut rng = thread_rng(); + let zip_file = fs::File::open(ZIP_FILENAME).await.str_err()?; + let mut zip_reader = io::BufReader::new(zip_file); + let archive = ZipFileReader::with_tokio(&mut zip_reader).await.str_err()?; + + let mut source = archive.source_questions(); + let files_count = source.len(); + let file_index = if file_num == 0 { + let files = Uniform::new(0, files_count); + rng.sample(files) + } else { + file_num - 1 + }; + + let src = source + .get(file_index) + .await + .map_err(|e| format!("get file {file_num} => {e}"))?; + let src = stream::once(async { src }); + pin_mut!(src); + let converter = src.converter(); + let questions: Vec<_> = converter.convert().collect().await; + if num == 0 { + num = (1..=questions.len()).choose(&mut rng).unwrap(); + } + let mut question = questions + .get(num - 1) + .ok_or(format!("get question #{num} => None"))? + .clone(); + question.num = num as u32; + Ok(question) +} + +async fn read_from_db(id: u32) -> Result { + let reader: async_db::Reader = async_db::Reader::new(NEW_DB_FILENAME).await?; + + let len = reader.len(); + + let index = if id == 0 { + let mut rng = thread_rng(); + let questions = Uniform::new(0, len); + rng.sample(questions) + } else { + id as usize - 1 + }; + + reader + .get(index) + .await + .map_err(|e| format!("get #{index} => {e}")) +} + +async fn write_db() { + let (tx, rx) = mpsc::unbounded_channel::(); + tokio::try_join!( + tokio::spawn(zip_reader_task(tx)), + tokio::spawn(db_writer_task(rx)) + ) + .expect("tokio join"); + println!("all done"); +} + async fn zip_reader_task(tx: UnboundedSender) { let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip"); let archive = ZipFileReader::with_tokio(&mut file) @@ -73,109 +181,6 @@ async fn zip_reader_task(tx: UnboundedSender) { println!("read done"); } -async fn print_question_from(get_q: F) -where - F: Future>, -{ - let q = get_q.await.expect("question not found"); - println!("{:#?}", q) -} - -async fn read_from_zip(file_num: usize, mut num: usize) -> Option { - let mut rng = thread_rng(); - let zip_file = fs::File::open(ZIP_FILENAME).await.expect("open zip file"); - let mut zip_reader = io::BufReader::new(zip_file); - let archive = ZipFileReader::with_tokio(&mut zip_reader) - .await - .expect("open zip file reader"); - - let mut source = archive.source_questions(); - let files_count = source.len(); - let file_index = if file_num == 0 { - let files = Uniform::new(0, files_count); - rng.sample(files) - } else { - file_num - 1 - }; - - let src = source.get(file_index).await; - let src = stream::once(async { src.expect("get source file") }); - pin_mut!(src); - let converter = src.converter(); - let questions: Vec<_> = converter.convert().collect().await; - if num == 0 { - num = (1..=questions.len()).choose(&mut rng).unwrap(); - } - let mut question = questions.get(num - 1).expect("get question").clone(); - question.num = num as u32; - Some(question) -} - -// measure and return time elapsed in `fut` in seconds -pub async fn measure(fut: F) -> f64 { - let start = Instant::now(); - fut.await; - let elapsed = start.elapsed(); - (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) -} - -pub async fn measure_and_print(fut: F) { - let m = measure(fut).await; - eprintln!("{}", m); -} - -#[tokio::main] -async fn main() { - let args = Cli::parse(); - - let mut action: Box> = match &args.command { - Command::Write => Box::new(write_db()), - Command::Print { id } => { - let get_question = read_from_db(*id); - Box::new(print_question_from(get_question)) - } - Command::ZipPrint { file_num, num } => { - let get_question = read_from_zip(*file_num, *num); - Box::new(print_question_from(get_question)) - } - }; - - if args.measure { - action = Box::new(measure_and_print(Box::into_pin(action))); - } - - Box::into_pin(action).await; -} - -async fn read_from_db(id: u32) -> Option { - let reader: async_db::Reader = async_db::Reader::new(NEW_DB_FILENAME) - .await - .expect("new db reader"); - - let len = reader.len(); - - let index = if id == 0 { - let mut rng = thread_rng(); - let questions = Uniform::new(0, len); - rng.sample(questions) - } else { - id as usize - 1 - }; - - match reader.get(index).await { - Ok(question) => Some(question), - Err(_) => None, - } -} -async fn write_db() { - let (tx, rx) = mpsc::unbounded_channel::(); - tokio::try_join!( - tokio::spawn(zip_reader_task(tx)), - tokio::spawn(db_writer_task(rx)) - ) - .expect("tokio join"); - println!("all done"); -} async fn db_writer_task(rx: UnboundedReceiver) { let writer_opts = WriterOpts::default(); let mut writer: async_db::Writer =