separate read/write with queue

This commit is contained in:
Dmitry Belyaev 2022-09-20 00:12:49 +03:00
parent 56a20dc6b1
commit 5999e2bb92
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -7,8 +7,11 @@ use async_zip::write::{EntryOptions, ZipFileWriter};
use async_zip::Compression;
use std::path::PathBuf;
use std::str::FromStr;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWrite, BufReader};
use tokio::{fs, task};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::sync::Mutex;
use tokio::fs;
use std::collections::VecDeque;
const INPUT_FILENAME: &str = "baza_utf8.zip";
const OUTPUT_FILENAME: &str = "json.zip";
@ -221,38 +224,19 @@ async fn parse_file(
Ok(ctx.data.clone())
}
async fn process_file<T: AsyncWrite + Unpin>(
archive: &ZipFileReader,
writer: &mut ZipFileWriter<T>,
index: usize,
struct WriteQueueItem {
name: String,
) -> Result<(), Box<dyn std::error::Error>> {
let entry_reader = archive.entry_reader(index).await?;
// parse file to json
let new_data = parse_file(entry_reader).await?;
// dump json to str
let data_str = task::spawn_blocking(move || new_data.pretty(2)).await?;
// make output filename
let mut outfilename = PathBuf::from(name);
outfilename.set_extension("json");
let outfilename = outfilename.to_str().unwrap().to_string();
let opts = EntryOptions::new(outfilename, Compression::Deflate);
// write new zip entry
writer.write_entry_whole(opts, data_str.as_bytes()).await?;
Ok(())
data: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// open archive just to list files
let archive = ZipFileReader::new(String::from(INPUT_FILENAME)).await?;
type WriteQueue = Arc<Mutex<VecDeque<Option<WriteQueueItem>>>>;
let source_files: Vec<(usize, String)> = archive
async fn data_reader(queue: WriteQueue) {
// open archive just to list files
let archive = ZipFileReader::new(String::from(INPUT_FILENAME)).await.unwrap();
let source_files = archive
.entries()
.iter()
.enumerate()
@ -261,28 +245,77 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// skip files without "txt" extension
entry.name().ends_with(".txt")
})
.map(|(index, entry)| (index, entry.name().to_string()))
.collect();
.map(|(index, entry)| (index, entry.name().to_string()));
//
for (index, name) in source_files {
let entry_reader = archive.entry_reader(index).await.unwrap();
// parse file to json
let new_data = parse_file(entry_reader).await.unwrap();
// dump json to str
let data = new_data.pretty(2);
// add to queue
queue.lock().await.push_back(Some(WriteQueueItem { name, data }));
}
// mark queue as done for writer to exit loop
queue.lock().await.push_back(None);
}
async fn data_writer(queue: WriteQueue) {
let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap();
let mut writer = ZipFileWriter::new(&mut file);
let mut is_reading_done = false;
loop {
let mut queue_locked = queue.lock().await;
if queue_locked.is_empty() {
drop(queue_locked);
if is_reading_done {
break;
}
tokio::time::sleep(tokio::time::Duration::from_micros(50)).await;
continue;
}
let item = queue_locked.pop_front().unwrap();
drop(queue_locked);
match item {
None => {
is_reading_done = true;
}
Some(WriteQueueItem{name, data}) => {
// make output filename
let mut outfilename = PathBuf::from(name);
outfilename.set_extension("json");
let outfilename = outfilename.to_str().unwrap().to_string();
let opts = EntryOptions::new(outfilename, Compression::Deflate);
// write new zip entry
writer.write_entry_whole(opts, data.as_bytes()).await.unwrap();
}
}
}
writer.close().await.unwrap();
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// check output filename
match fs::metadata(OUTPUT_FILENAME).await {
Ok(x) if x.is_dir() => return Err("output file is a directory!".into()),
_ => (),
};
println!(
r#" 【 "{}" ➡ "{}" 】 processing {} files 💾⏳ ..."#,
INPUT_FILENAME,
OUTPUT_FILENAME,
source_files.len()
let queue: WriteQueue = Arc::new(Mutex::new(VecDeque::with_capacity(40)));
let queue_r = Arc::clone(&queue);
let queue_w = Arc::clone(&queue);
let res = tokio::try_join!(
tokio::spawn(async move {data_reader(queue_r).await}),
tokio::spawn(async move {data_writer(queue_w).await})
);
let mut file = fs::File::create(OUTPUT_FILENAME).await?;
let mut writer = ZipFileWriter::new(&mut file);
for (index, name) in source_files {
process_file(&archive, &mut writer, index, name).await?;
}
writer.close().await?;
res?;
println!("done ✅");
Ok(())