use unbounded_channel instead of VecDeque

This commit is contained in:
Dmitry Belyaev 2022-09-20 17:17:59 +03:00
parent bc7bdee83a
commit fbca249149
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -5,13 +5,11 @@ extern crate tokio;
use async_zip::read::fs::ZipFileReader; use async_zip::read::fs::ZipFileReader;
use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::write::{EntryOptions, ZipFileWriter};
use async_zip::Compression; use async_zip::Compression;
use std::collections::VecDeque;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::sync::Mutex; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
const INPUT_FILENAME: &str = "baza_utf8.zip"; const INPUT_FILENAME: &str = "baza_utf8.zip";
const OUTPUT_FILENAME: &str = "json.zip"; const OUTPUT_FILENAME: &str = "json.zip";
@ -273,15 +271,14 @@ async fn parse_file(
Ok(parser.get_parsed()) Ok(parser.get_parsed())
} }
struct WriteQueueItem { #[derive(Debug)]
struct ConvertedFile {
name: String, name: String,
data: String, data: String,
} }
type WriteQueue = Arc<Mutex<VecDeque<Option<WriteQueueItem>>>>;
/// read txt files from zip and convert to json /// read txt files from zip and convert to json
async fn reader_converter(queue: WriteQueue) { async fn reader_converter(tx: UnboundedSender<ConvertedFile>) {
// open archive just to list files // open archive just to list files
let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) let archive = ZipFileReader::new(String::from(INPUT_FILENAME))
.await .await
@ -305,43 +302,18 @@ async fn reader_converter(queue: WriteQueue) {
// dump json to str // dump json to str
let data = new_data.pretty(2); let data = new_data.pretty(2);
// add to queue tx.send(ConvertedFile { name, data }).unwrap();
queue
.lock()
.await
.push_back(Some(WriteQueueItem { name, data }));
} }
// mark queue as done for writer to exit loop
queue.lock().await.push_back(None);
println!("convert done ✅"); println!("convert done ✅");
} }
/// write json data to zip files /// write json data to zip files
async fn zip_writer(queue: WriteQueue) { async fn zip_writer(mut rx: UnboundedReceiver<ConvertedFile>) {
let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap();
let mut writer = ZipFileWriter::new(&mut file); let mut writer = ZipFileWriter::new(&mut file);
let mut is_reading_done = false; while let Some(ConvertedFile { name, data }) = rx.recv().await {
loop {
let mut queue_locked = queue.lock().await;
if queue_locked.is_empty() {
drop(queue_locked);
if is_reading_done {
break;
}
tokio::time::sleep(tokio::time::Duration::from_micros(50)).await;
continue;
}
let item = queue_locked.pop_front().unwrap();
drop(queue_locked);
match item {
None => {
is_reading_done = true;
}
Some(WriteQueueItem { name, data }) => {
// make output filename // make output filename
let mut outfilename = PathBuf::from(name); let mut outfilename = PathBuf::from(name);
outfilename.set_extension("json"); outfilename.set_extension("json");
@ -354,8 +326,6 @@ async fn zip_writer(queue: WriteQueue) {
.await .await
.unwrap(); .unwrap();
} }
}
}
writer.close().await.unwrap(); writer.close().await.unwrap();
println!("write done ✅"); println!("write done ✅");
@ -369,13 +339,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
_ => (), _ => (),
}; };
let queue: WriteQueue = Arc::new(Mutex::new(VecDeque::with_capacity(5000))); let (tx, rx) = mpsc::unbounded_channel::<ConvertedFile>();
let queue_r = Arc::clone(&queue);
let queue_w = Arc::clone(&queue);
tokio::try_join!( tokio::try_join!(
tokio::spawn(reader_converter(queue_r)), tokio::spawn(reader_converter(tx)),
tokio::spawn(zip_writer(queue_w)) tokio::spawn(zip_writer(rx))
)?; )?;
println!("all done ✅"); println!("all done ✅");