diff --git a/src/main.rs b/src/main.rs index 19d3c09..0d95490 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use async_zip::Compression; use std::path::PathBuf; use std::str::FromStr; use tokio::fs; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; const INPUT_FILENAME: &str = "baza_utf8.zip"; @@ -251,39 +251,22 @@ impl QuestionsParser { } } -async fn parse_file( - entry_reader: impl AsyncReadExt + Unpin, -) -> Result> { - let buf_reader = BufReader::new(entry_reader); - let mut lines = buf_reader.lines(); - - let mut parser = QuestionsParser::new(); - - while let Some(line_r) = lines.next_line().await? { - let line = line_r.trim(); - if line.is_empty() { - continue; - } - - parser.parse_line(line); - } - - parser.finish(); - Ok(parser.get_parsed()) +#[derive(Debug)] +struct FileText { + name: String, + text: String, } #[derive(Debug)] -struct ConvertedFile { - name: String, - data: String, +enum TextReaderMessage { + NextLine(String), + EndOfFile(String), } /// read txt files from zip and convert to json -async fn reader_converter(tx: UnboundedSender) { +async fn zip_text_reader(tx: UnboundedSender) { // open archive just to list files - let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) - .await - .unwrap(); + let archive = ZipFileReader::new(INPUT_FILENAME).await.expect("open zip"); let source_files = archive .entries() @@ -297,25 +280,56 @@ async fn reader_converter(tx: UnboundedSender) { .map(|(index, entry)| (index, entry.name().to_string())); // for (index, name) in source_files { - let entry_reader = archive.entry_reader(index).await.unwrap(); - // parse file to json - let new_data = parse_file(entry_reader).await.unwrap(); - // dump json to str - let data = new_data.pretty(2); - - tx.send(ConvertedFile { name, data }).unwrap(); + let entry_reader = archive.entry_reader(index).await.expect("read entry"); + let buf_reader = BufReader::new(entry_reader); + let mut lines = buf_reader.lines(); + while let Some(line) = lines.next_line().await.expect("next line") { + tx.send(TextReaderMessage::NextLine(line)) + .expect("send line"); + } + tx.send(TextReaderMessage::EndOfFile(name)) + .expect("send end"); } + println!("read done ✅"); +} + +/// convert text questions to json format +async fn questions_converter( + mut rx: UnboundedReceiver, + tx: UnboundedSender, +) { + let mut parser = QuestionsParser::new(); + while let Some(msg) = rx.recv().await { + match msg { + TextReaderMessage::NextLine(line) => { + let line = line.trim(); + if line.is_empty() { + continue; + } + parser.parse_line(line); + } + TextReaderMessage::EndOfFile(name) => { + parser.finish(); + let data_json = parser.get_parsed(); + let text = data_json.pretty(2); + tx.send(FileText { name, text }).expect("send json"); + parser = QuestionsParser::new(); + } + } + } println!("convert done ✅"); } /// write json data to zip files -async fn zip_writer(mut rx: UnboundedReceiver) { - let file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); +async fn zip_json_writer(mut rx: UnboundedReceiver) { + let file = fs::File::create(OUTPUT_FILENAME) + .await + .expect("create file"); let mut buf = BufWriter::with_capacity(100 * 1024 * 1024, file); let mut writer = ZipFileWriter::new(&mut buf); - while let Some(ConvertedFile { name, data }) = rx.recv().await { + while let Some(FileText { name, text: data }) = rx.recv().await { // make output filename let mut outfilename = PathBuf::from(name); outfilename.set_extension("json"); @@ -326,10 +340,10 @@ async fn zip_writer(mut rx: UnboundedReceiver) { writer .write_entry_whole(opts, data.as_bytes()) .await - .unwrap(); + .expect("write entry"); } - writer.close().await.unwrap(); - buf.flush().await.unwrap(); + writer.close().await.expect("close writer"); + buf.flush().await.expect("flush buffer"); println!("write done ✅"); } @@ -342,11 +356,13 @@ async fn main() -> Result<(), Box> { _ => (), }; - let (tx, rx) = mpsc::unbounded_channel::(); + let (reader_tx, reader_rx) = mpsc::unbounded_channel::(); + let (json_tx, json_rx) = mpsc::unbounded_channel::(); tokio::try_join!( - tokio::spawn(reader_converter(tx)), - tokio::spawn(zip_writer(rx)) + tokio::spawn(zip_text_reader(reader_tx)), + tokio::spawn(questions_converter(reader_rx, json_tx)), + tokio::spawn(zip_json_writer(json_rx)) )?; println!("all done ✅");