extern crate async_zip; extern crate json; extern crate tokio; use async_zip::read::fs::ZipFileReader; use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; use std::collections::VecDeque; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use tokio::fs; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; use tokio::sync::Mutex; const INPUT_FILENAME: &str = "baza_utf8.zip"; const OUTPUT_FILENAME: &str = "json.zip"; #[derive(Debug, Clone, Copy)] enum KeywordType { Ignore, Global, QuestionPre, QuestionStart, QuestionContent, CurrentScope, } #[derive(Debug, Clone, Copy)] enum DataScope { Global, QuestionPre, QuestionContent, } struct Context { // global output value data: json::JsonValue, // temp questions array questions: json::JsonValue, cur_keyword_type: Option, // temp question value cur_question: json::JsonValue, // temp value for pre'question fields cur_question_pre: json::JsonValue, // scope for data fields cur_scope: DataScope, // curent json key cur_tag: String, // current json value cur_content: Vec, // need to push temp question value if true have_new_question: bool, // prev. keyword type last_keyword_type: Option, // prev. json key (used for store acummulated content when new keyword readed) last_tag: String, } // check questions before push trait PushIfValid { fn is_valid(&self) -> bool; fn push_if_valid(&mut self, value: json::JsonValue); } impl PushIfValid for json::JsonValue { fn is_valid(&self) -> bool { self.has_key("Вопрос") && self.has_key("Ответ") } fn push_if_valid(&mut self, value: json::JsonValue) { if value.is_valid() { self.push(value).unwrap_or(()) } } } impl Context { fn new() -> Context { Context { data: json::JsonValue::new_object(), questions: json::JsonValue::new_array(), cur_keyword_type: None, cur_question: json::JsonValue::new_object(), cur_question_pre: json::JsonValue::new_object(), cur_tag: String::new(), cur_content: Vec::::new(), cur_scope: DataScope::Global, have_new_question: false, last_keyword_type: None, last_tag: String::new(), } } } impl FromStr for KeywordType { type Err = (); fn from_str(pattern: &str) -> Result { use KeywordType::*; Ok(match pattern { "Мета:" => Ignore, "Чемпионат:" | "Пакет:" => Global, "Тур:" => QuestionPre, "Вопрос " | "Вопрос:" => QuestionStart, "Ответ:" | "Зачет:" => QuestionContent, _ => CurrentScope, // "URL:" | "Ссылка:" | "Дата:" | "Обработан:" | "Автор:" | "Редактор:" | "Копирайт:" | "Инфо:" | // "Тема:" | "Вид:" | "Тип:" | "Источник:" | "Рейтинг:" | "Комментарий:" | "Комментарии:" }) } } async fn parse_file( entry_reader: impl AsyncReadExt + Unpin, ) -> Result> { let buf_reader = BufReader::new(entry_reader); let mut lines = buf_reader.lines(); let patterns = vec![ "Чемпионат:", "Пакет:", "URL:", "Ссылка:", "Дата:", "Редактор:", "Обработан:", "Копирайт:", "Инфо:", "Тема:", "Вид:", "Тип:", "Тур:", "Мета:", "Вопрос ", "Вопрос:", "Ответ:", "Зачет:", "Источник:", "Рейтинг:", "Автор:", "Комментарий:", "Комментарии:", ]; let mut context = Context::new(); let mut ctx = &mut context; while let Some(line_r) = lines.next_line().await? { let line = line_r.trim(); if line.is_empty() { continue; } let line = line.to_string(); match patterns .iter() // find keyword .find(|&&pattern| line.starts_with(pattern) && line.ends_with(':')) { Some(pattern) => { use KeywordType::*; ctx.last_keyword_type = ctx.cur_keyword_type; ctx.last_tag = ctx.cur_tag.clone(); ctx.cur_keyword_type = Some(pattern.parse().unwrap()); ctx.cur_tag = pattern.replace(' ', "").replace(':', ""); // remember question id if let Some(QuestionStart) = ctx.cur_keyword_type { ctx.cur_question_pre["id"] = line.replace(':', "").as_str().into(); }; // apply accumulated content when new keyword found match ctx.last_keyword_type { Some(Global) => { ctx.cur_scope = DataScope::Global; ctx.data[&ctx.last_tag] = ctx.cur_content.join("\n").into() } Some(QuestionPre) => { ctx.cur_scope = DataScope::QuestionPre; ctx.cur_question_pre[&ctx.last_tag] = ctx.cur_content.join("\n").into(); } Some(QuestionStart) => { ctx.cur_scope = DataScope::QuestionContent; // store prev question before reading new if ctx.have_new_question { ctx.questions.push_if_valid(ctx.cur_question.clone()); } // prepare to read new question data with cur_question_pre values ctx.cur_question = ctx.cur_question_pre.clone(); ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into(); ctx.have_new_question = true; } Some(QuestionContent) => { ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into(); } Some(CurrentScope) => { // match value to store data let scope_data = match ctx.cur_scope { DataScope::Global => &mut ctx.data, DataScope::QuestionPre => &mut ctx.cur_question_pre, DataScope::QuestionContent => &mut ctx.cur_question, }; scope_data[&ctx.last_tag] = ctx.cur_content.join("\n").into(); } _ => (), //None or Ignore }; // clear content ctx.cur_content.clear(); } None => { // accumulate content if line is not a keyword ctx.cur_content.push(line); } } } // finish reading last question if ctx.have_new_question && !ctx.cur_content.is_empty() { ctx.cur_question[&ctx.cur_tag] = ctx.cur_content.join("\n").into(); ctx.questions.push_if_valid(ctx.cur_question.clone()); ctx.have_new_question = false; } ctx.data["Вопросы"] = ctx.questions.clone(); Ok(ctx.data.clone()) } struct WriteQueueItem { name: String, data: String, } type WriteQueue = Arc>>>; async fn data_reader(queue: WriteQueue) { // open archive just to list files let archive = ZipFileReader::new(String::from(INPUT_FILENAME)) .await .unwrap(); let source_files = archive .entries() .iter() .enumerate() .filter(|(_, entry)| !entry.dir()) .filter(|(_, entry)| { // skip files without "txt" extension entry.name().ends_with(".txt") }) .map(|(index, entry)| (index, entry.name().to_string())); // for (index, name) in source_files { let entry_reader = archive.entry_reader(index).await.unwrap(); // parse file to json let new_data = parse_file(entry_reader).await.unwrap(); // dump json to str let data = new_data.pretty(2); // add to queue queue .lock() .await .push_back(Some(WriteQueueItem { name, data })); } // mark queue as done for writer to exit loop queue.lock().await.push_back(None); } async fn data_writer(queue: WriteQueue) { let mut file = fs::File::create(OUTPUT_FILENAME).await.unwrap(); let mut writer = ZipFileWriter::new(&mut file); let mut is_reading_done = false; loop { let mut queue_locked = queue.lock().await; if queue_locked.is_empty() { drop(queue_locked); if is_reading_done { break; } tokio::time::sleep(tokio::time::Duration::from_micros(50)).await; continue; } let item = queue_locked.pop_front().unwrap(); drop(queue_locked); match item { None => { is_reading_done = true; } Some(WriteQueueItem { name, data }) => { // make output filename let mut outfilename = PathBuf::from(name); outfilename.set_extension("json"); let outfilename = outfilename.to_str().unwrap().to_string(); let opts = EntryOptions::new(outfilename, Compression::Deflate); // write new zip entry writer .write_entry_whole(opts, data.as_bytes()) .await .unwrap(); } } } writer.close().await.unwrap(); } #[tokio::main] async fn main() -> Result<(), Box> { // check output filename match fs::metadata(OUTPUT_FILENAME).await { Ok(x) if x.is_dir() => return Err("output file is a directory!".into()), _ => (), }; let queue: WriteQueue = Arc::new(Mutex::new(VecDeque::with_capacity(40))); let queue_r = Arc::clone(&queue); let queue_w = Arc::clone(&queue); let res = tokio::try_join!( tokio::spawn(async move { data_reader(queue_r).await }), tokio::spawn(async move { data_writer(queue_w).await }) ); res?; println!("done ✅"); Ok(()) }