split reader/converter

This commit is contained in:
Dmitry Belyaev 2022-10-01 12:42:32 +03:00
parent 24f675ca16
commit 2a06eabd15
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3
1 changed files with 59 additions and 43 deletions

View File

@ -8,7 +8,7 @@ use async_zip::Compression;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
const INPUT_FILENAME: &str = "baza_utf8.zip"; const INPUT_FILENAME: &str = "baza_utf8.zip";
@ -251,39 +251,22 @@ impl QuestionsParser {
} }
} }
async fn parse_file( #[derive(Debug)]
entry_reader: impl AsyncReadExt + Unpin, struct FileText {
) -> Result<json::JsonValue, Box<dyn std::error::Error>> { name: String,
let buf_reader = BufReader::new(entry_reader); text: String,
let mut lines = buf_reader.lines();
let mut parser = QuestionsParser::new();
while let Some(line_r) = lines.next_line().await? {
let line = line_r.trim();
if line.is_empty() {
continue;
}
parser.parse_line(line);
}
parser.finish();
Ok(parser.get_parsed())
} }
#[derive(Debug)] #[derive(Debug)]
struct ConvertedFile { enum TextReaderMessage {
name: String, NextLine(String),
data: String, EndOfFile(String),
} }
/// read txt files from zip and convert to json /// read txt files from zip and convert to json
async fn reader_converter(tx: UnboundedSender<ConvertedFile>) { async fn zip_text_reader(tx: UnboundedSender<TextReaderMessage>) {
// open archive just to list files // open archive just to list files
let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) let archive = ZipFileReader::new(INPUT_FILENAME).await.expect("open zip");
.await
.unwrap();
let source_files = archive let source_files = archive
.entries() .entries()
@ -297,25 +280,56 @@ async fn reader_converter(tx: UnboundedSender<ConvertedFile>) {
.map(|(index, entry)| (index, entry.name().to_string())); .map(|(index, entry)| (index, entry.name().to_string()));
// //
for (index, name) in source_files { for (index, name) in source_files {
let entry_reader = archive.entry_reader(index).await.unwrap(); let entry_reader = archive.entry_reader(index).await.expect("read entry");
// parse file to json let buf_reader = BufReader::new(entry_reader);
let new_data = parse_file(entry_reader).await.unwrap(); let mut lines = buf_reader.lines();
// dump json to str while let Some(line) = lines.next_line().await.expect("next line") {
let data = new_data.pretty(2); tx.send(TextReaderMessage::NextLine(line))
.expect("send line");
tx.send(ConvertedFile { name, data }).unwrap(); }
tx.send(TextReaderMessage::EndOfFile(name))
.expect("send end");
} }
println!("read done ✅");
}
/// convert text questions to json format
async fn questions_converter(
mut rx: UnboundedReceiver<TextReaderMessage>,
tx: UnboundedSender<FileText>,
) {
let mut parser = QuestionsParser::new();
while let Some(msg) = rx.recv().await {
match msg {
TextReaderMessage::NextLine(line) => {
let line = line.trim();
if line.is_empty() {
continue;
}
parser.parse_line(line);
}
TextReaderMessage::EndOfFile(name) => {
parser.finish();
let data_json = parser.get_parsed();
let text = data_json.pretty(2);
tx.send(FileText { name, text }).expect("send json");
parser = QuestionsParser::new();
}
}
}
println!("convert done ✅"); println!("convert done ✅");
} }
/// write json data to zip files /// write json data to zip files
async fn zip_writer(mut rx: UnboundedReceiver<ConvertedFile>) { async fn zip_json_writer(mut rx: UnboundedReceiver<FileText>) {
let file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); let file = fs::File::create(OUTPUT_FILENAME)
.await
.expect("create file");
let mut buf = BufWriter::with_capacity(100 * 1024 * 1024, file); let mut buf = BufWriter::with_capacity(100 * 1024 * 1024, file);
let mut writer = ZipFileWriter::new(&mut buf); let mut writer = ZipFileWriter::new(&mut buf);
while let Some(ConvertedFile { name, data }) = rx.recv().await { while let Some(FileText { name, text: data }) = rx.recv().await {
// make output filename // make output filename
let mut outfilename = PathBuf::from(name); let mut outfilename = PathBuf::from(name);
outfilename.set_extension("json"); outfilename.set_extension("json");
@ -326,10 +340,10 @@ async fn zip_writer(mut rx: UnboundedReceiver<ConvertedFile>) {
writer writer
.write_entry_whole(opts, data.as_bytes()) .write_entry_whole(opts, data.as_bytes())
.await .await
.unwrap(); .expect("write entry");
} }
writer.close().await.unwrap(); writer.close().await.expect("close writer");
buf.flush().await.unwrap(); buf.flush().await.expect("flush buffer");
println!("write done ✅"); println!("write done ✅");
} }
@ -342,11 +356,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
_ => (), _ => (),
}; };
let (tx, rx) = mpsc::unbounded_channel::<ConvertedFile>(); let (reader_tx, reader_rx) = mpsc::unbounded_channel::<TextReaderMessage>();
let (json_tx, json_rx) = mpsc::unbounded_channel::<FileText>();
tokio::try_join!( tokio::try_join!(
tokio::spawn(reader_converter(tx)), tokio::spawn(zip_text_reader(reader_tx)),
tokio::spawn(zip_writer(rx)) tokio::spawn(questions_converter(reader_rx, json_tx)),
tokio::spawn(zip_json_writer(json_rx))
)?; )?;
println!("all done ✅"); println!("all done ✅");