From fbca249149225712ff7ba7f7e3b45ff0e4c0534a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 20 Sep 2022 17:17:59 +0300 Subject: [PATCH] use unbounded_channel instead of VecDeque --- src/main.rs | 72 +++++++++++++++-------------------------------------- 1 file changed, 20 insertions(+), 52 deletions(-) diff --git a/src/main.rs b/src/main.rs index 970f45e..de0f42c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,11 @@ extern crate tokio; use async_zip::read::fs::ZipFileReader; use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; -use std::collections::VecDeque; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; use tokio::fs; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; -use tokio::sync::Mutex; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; const INPUT_FILENAME: &str = "baza_utf8.zip"; const OUTPUT_FILENAME: &str = "json.zip"; @@ -273,15 +271,14 @@ async fn parse_file( Ok(parser.get_parsed()) } -struct WriteQueueItem { +#[derive(Debug)] +struct ConvertedFile { name: String, data: String, } -type WriteQueue = Arc>>>; - /// read txt files from zip and convert to json -async fn reader_converter(queue: WriteQueue) { +async fn reader_converter(tx: UnboundedSender) { // open archive just to list files let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) .await @@ -305,56 +302,29 @@ async fn reader_converter(queue: WriteQueue) { // dump json to str let data = new_data.pretty(2); - // add to queue - queue - .lock() - .await - .push_back(Some(WriteQueueItem { name, data })); + tx.send(ConvertedFile { name, data }).unwrap(); } - // mark queue as done for writer to exit loop - queue.lock().await.push_back(None); println!("convert done ✅"); } - /// write json data to zip files -async fn zip_writer(queue: WriteQueue) { +async fn zip_writer(mut rx: UnboundedReceiver) { let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); let mut writer = ZipFileWriter::new(&mut file); - let mut is_reading_done = false; - loop { - let mut queue_locked = queue.lock().await; - if queue_locked.is_empty() { - drop(queue_locked); - if is_reading_done { - break; - } - tokio::time::sleep(tokio::time::Duration::from_micros(50)).await; - continue; - } + while let Some(ConvertedFile { name, data }) = rx.recv().await { + // make output filename + let mut outfilename = PathBuf::from(name); + outfilename.set_extension("json"); + let outfilename = outfilename.to_str().unwrap().to_string(); + let opts = EntryOptions::new(outfilename, OUTPUT_COMPRESSION); - let item = queue_locked.pop_front().unwrap(); - drop(queue_locked); - match item { - None => { - is_reading_done = true; - } - Some(WriteQueueItem { name, data }) => { - // make output filename - let mut outfilename = PathBuf::from(name); - outfilename.set_extension("json"); - let outfilename = outfilename.to_str().unwrap().to_string(); - let opts = EntryOptions::new(outfilename, OUTPUT_COMPRESSION); - - // write new zip entry - writer - .write_entry_whole(opts, data.as_bytes()) - .await - .unwrap(); - } - } + // write new zip entry + writer + .write_entry_whole(opts, data.as_bytes()) + .await + .unwrap(); } writer.close().await.unwrap(); @@ -369,13 +339,11 @@ async fn main() -> Result<(), Box> { _ => (), }; - let queue: WriteQueue = Arc::new(Mutex::new(VecDeque::with_capacity(5000))); - let queue_r = Arc::clone(&queue); - let queue_w = Arc::clone(&queue); + let (tx, rx) = mpsc::unbounded_channel::(); tokio::try_join!( - tokio::spawn(reader_converter(queue_r)), - tokio::spawn(zip_writer(queue_w)) + tokio::spawn(reader_converter(tx)), + tokio::spawn(zip_writer(rx)) )?; println!("all done ✅");