diff --git a/src/main.rs b/src/main.rs index 04d56d4..2add117 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,13 @@ extern crate tokio; use async_zip::read::fs::ZipFileReader; use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; +use std::collections::VecDeque; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use tokio::fs; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; use tokio::sync::Mutex; -use tokio::fs; -use std::collections::VecDeque; const INPUT_FILENAME: &str = "baza_utf8.zip"; const OUTPUT_FILENAME: &str = "json.zip"; @@ -224,7 +224,6 @@ async fn parse_file( Ok(ctx.data.clone()) } - struct WriteQueueItem { name: String, data: String, @@ -234,7 +233,9 @@ type WriteQueue = Arc>>>; async fn data_reader(queue: WriteQueue) { // open archive just to list files - let archive = ZipFileReader::new(String::from(INPUT_FILENAME)).await.unwrap(); + let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) + .await + .unwrap(); let source_files = archive .entries() @@ -253,9 +254,12 @@ async fn data_reader(queue: WriteQueue) { let new_data = parse_file(entry_reader).await.unwrap(); // dump json to str let data = new_data.pretty(2); - + // add to queue - queue.lock().await.push_back(Some(WriteQueueItem { name, data })); + queue + .lock() + .await + .push_back(Some(WriteQueueItem { name, data })); } // mark queue as done for writer to exit loop queue.lock().await.push_back(None); @@ -276,14 +280,14 @@ async fn data_writer(queue: WriteQueue) { tokio::time::sleep(tokio::time::Duration::from_micros(50)).await; continue; } - + let item = queue_locked.pop_front().unwrap(); drop(queue_locked); match item { None => { is_reading_done = true; } - Some(WriteQueueItem{name, data}) => { + Some(WriteQueueItem { name, data }) => { // make output filename let mut outfilename = PathBuf::from(name); outfilename.set_extension("json"); @@ -291,7 +295,10 @@ async fn data_writer(queue: WriteQueue) { let opts = EntryOptions::new(outfilename, Compression::Deflate); // write new zip entry - writer.write_entry_whole(opts, data.as_bytes()).await.unwrap(); + writer + .write_entry_whole(opts, data.as_bytes()) + .await + .unwrap(); } } } @@ -311,8 +318,8 @@ async fn main() -> Result<(), Box> { let queue_w = Arc::clone(&queue); let res = tokio::try_join!( - tokio::spawn(async move {data_reader(queue_r).await}), - tokio::spawn(async move {data_writer(queue_w).await}) + tokio::spawn(async move { data_reader(queue_r).await }), + tokio::spawn(async move { data_writer(queue_w).await }) ); res?;