From 5999e2bb92b81dade5feb0f937b3bd9e4fe13497 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 20 Sep 2022 00:12:49 +0300 Subject: [PATCH] separate read/write with queue --- src/main.rs | 119 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 76 insertions(+), 43 deletions(-) diff --git a/src/main.rs b/src/main.rs index 390d928..04d56d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,11 @@ use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; use std::path::PathBuf; use std::str::FromStr; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWrite, BufReader}; -use tokio::{fs, task}; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; +use tokio::sync::Mutex; +use tokio::fs; +use std::collections::VecDeque; const INPUT_FILENAME: &str = "baza_utf8.zip"; const OUTPUT_FILENAME: &str = "json.zip"; @@ -221,38 +224,19 @@ async fn parse_file( Ok(ctx.data.clone()) } -async fn process_file( - archive: &ZipFileReader, - writer: &mut ZipFileWriter, - index: usize, + +struct WriteQueueItem { name: String, -) -> Result<(), Box> { - let entry_reader = archive.entry_reader(index).await?; - - // parse file to json - let new_data = parse_file(entry_reader).await?; - - // dump json to str - let data_str = task::spawn_blocking(move || new_data.pretty(2)).await?; - - // make output filename - let mut outfilename = PathBuf::from(name); - outfilename.set_extension("json"); - let outfilename = outfilename.to_str().unwrap().to_string(); - let opts = EntryOptions::new(outfilename, Compression::Deflate); - - // write new zip entry - writer.write_entry_whole(opts, data_str.as_bytes()).await?; - - Ok(()) + data: String, } -#[tokio::main] -async fn main() -> Result<(), Box> { - // open archive just to list files - let archive = ZipFileReader::new(String::from(INPUT_FILENAME)).await?; +type WriteQueue = Arc>>>; - let source_files: Vec<(usize, String)> = archive +async fn data_reader(queue: WriteQueue) { + // open archive just to list files + let archive = ZipFileReader::new(String::from(INPUT_FILENAME)).await.unwrap(); + + let source_files = archive .entries() .iter() .enumerate() @@ -261,28 +245,77 @@ async fn main() -> Result<(), Box> { // skip files without "txt" extension entry.name().ends_with(".txt") }) - .map(|(index, entry)| (index, entry.name().to_string())) - .collect(); + .map(|(index, entry)| (index, entry.name().to_string())); + // + for (index, name) in source_files { + let entry_reader = archive.entry_reader(index).await.unwrap(); + // parse file to json + let new_data = parse_file(entry_reader).await.unwrap(); + // dump json to str + let data = new_data.pretty(2); + + // add to queue + queue.lock().await.push_back(Some(WriteQueueItem { name, data })); + } + // mark queue as done for writer to exit loop + queue.lock().await.push_back(None); +} +async fn data_writer(queue: WriteQueue) { + let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); + let mut writer = ZipFileWriter::new(&mut file); + + let mut is_reading_done = false; + loop { + let mut queue_locked = queue.lock().await; + if queue_locked.is_empty() { + drop(queue_locked); + if is_reading_done { + break; + } + tokio::time::sleep(tokio::time::Duration::from_micros(50)).await; + continue; + } + + let item = queue_locked.pop_front().unwrap(); + drop(queue_locked); + match item { + None => { + is_reading_done = true; + } + Some(WriteQueueItem{name, data}) => { + // make output filename + let mut outfilename = PathBuf::from(name); + outfilename.set_extension("json"); + let outfilename = outfilename.to_str().unwrap().to_string(); + let opts = EntryOptions::new(outfilename, Compression::Deflate); + + // write new zip entry + writer.write_entry_whole(opts, data.as_bytes()).await.unwrap(); + } + } + } + writer.close().await.unwrap(); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { // check output filename match fs::metadata(OUTPUT_FILENAME).await { Ok(x) if x.is_dir() => return Err("output file is a directory!".into()), _ => (), }; - println!( - r#" 【 "{}" ➡ "{}" 】 processing {} files 💾⏳ ..."#, - INPUT_FILENAME, - OUTPUT_FILENAME, - source_files.len() + let queue: WriteQueue = Arc::new(Mutex::new(VecDeque::with_capacity(40))); + let queue_r = Arc::clone(&queue); + let queue_w = Arc::clone(&queue); + + let res = tokio::try_join!( + tokio::spawn(async move {data_reader(queue_r).await}), + tokio::spawn(async move {data_writer(queue_w).await}) ); - let mut file = fs::File::create(OUTPUT_FILENAME).await?; - let mut writer = ZipFileWriter::new(&mut file); - for (index, name) in source_files { - process_file(&archive, &mut writer, index, name).await?; - } - writer.close().await?; + res?; println!("done ✅"); Ok(())