This commit is contained in:
2022-09-12 15:59:35 +03:00
parent 9eca2a68df
commit 9cb95c435a
5 changed files with 671 additions and 334 deletions

View File

@@ -1,18 +1,16 @@
extern crate encoding;
extern crate async_zip;
extern crate futures;
extern crate json;
extern crate rayon;
extern crate textstream;
extern crate zip;
extern crate tokio;
use encoding::all::KOI8_R;
use encoding::DecoderTrap;
use rayon::prelude::*;
use async_zip::read::fs::ZipFileReader;
use futures::future;
use std::path::PathBuf;
use std::str::FromStr;
use std::{fs, io};
use textstream::TextReader;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
const BASE_FILENAME: &str = "baza.zip";
const BASE_FILENAME: &str = "baza_utf8.zip";
const OUTPUT_PATH: &str = "json";
#[derive(Debug, Clone, Copy)]
@@ -109,9 +107,11 @@ impl FromStr for KeywordType {
}
}
fn parse_file(file: impl io::Read) -> Result<json::JsonValue, Box<dyn std::error::Error>> {
let buf = io::BufReader::new(file);
let reader = TextReader::new(buf, KOI8_R, DecoderTrap::Ignore);
async fn parse_file(
entry_reader: impl AsyncReadExt + Unpin,
) -> Result<json::JsonValue, Box<dyn std::error::Error>> {
let buf_reader = BufReader::new(entry_reader);
let mut lines = buf_reader.lines();
let patterns = vec![
"Чемпионат:",
@@ -141,72 +141,74 @@ fn parse_file(file: impl io::Read) -> Result<json::JsonValue, Box<dyn std::error
let mut context = Context::new();
let mut ctx = &mut context;
reader
.lines()
.map(|line| String::from(line.unwrap().trim()))
.filter(|line| !line.is_empty()) // ignore empty lines
.for_each(|line| {
match patterns
.iter() // find keyword
.find(|&&pattern| line.starts_with(pattern) && line.ends_with(':'))
{
Some(pattern) => {
use KeywordType::*;
while let Some(line_r) = lines.next_line().await? {
let line = line_r.trim();
if line.is_empty() {
continue;
}
let line = line.to_string();
ctx.last_keyword_type = ctx.cur_keyword_type;
ctx.last_tag = ctx.cur_tag.clone();
ctx.cur_keyword_type = Some(pattern.parse().unwrap());
ctx.cur_tag = pattern.replace(' ', "").replace(':', "");
match patterns
.iter() // find keyword
.find(|&&pattern| line.starts_with(pattern) && line.ends_with(':'))
{
Some(pattern) => {
use KeywordType::*;
// remember question id
if let Some(QuestionStart) = ctx.cur_keyword_type {
ctx.cur_question_pre["id"] = line.replace(':', "").as_str().into();
};
ctx.last_keyword_type = ctx.cur_keyword_type;
ctx.last_tag = ctx.cur_tag.clone();
ctx.cur_keyword_type = Some(pattern.parse().unwrap());
ctx.cur_tag = pattern.replace(' ', "").replace(':', "");
// apply accumulated content when new keyword found
match ctx.last_keyword_type {
Some(Global) => {
ctx.cur_scope = DataScope::Global;
ctx.data[&ctx.last_tag] = ctx.cur_content.join("\n").into()
// remember question id
if let Some(QuestionStart) = ctx.cur_keyword_type {
ctx.cur_question_pre["id"] = line.replace(':', "").as_str().into();
};
// apply accumulated content when new keyword found
match ctx.last_keyword_type {
Some(Global) => {
ctx.cur_scope = DataScope::Global;
ctx.data[&ctx.last_tag] = ctx.cur_content.join("\n").into()
}
Some(QuestionPre) => {
ctx.cur_scope = DataScope::QuestionPre;
ctx.cur_question_pre[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
Some(QuestionStart) => {
ctx.cur_scope = DataScope::QuestionContent;
// store prev question before reading new
if ctx.have_new_question {
ctx.questions.push_if_valid(ctx.cur_question.clone());
}
Some(QuestionPre) => {
ctx.cur_scope = DataScope::QuestionPre;
ctx.cur_question_pre[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
Some(QuestionStart) => {
ctx.cur_scope = DataScope::QuestionContent;
// store prev question before reading new
if ctx.have_new_question {
ctx.questions.push_if_valid(ctx.cur_question.clone());
}
// prepare to read new question data with cur_question_pre values
ctx.cur_question = ctx.cur_question_pre.clone();
ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into();
ctx.have_new_question = true;
}
Some(QuestionContent) => {
ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
Some(CurrentScope) => {
// match value to store data
let scope_data = match ctx.cur_scope {
DataScope::Global => &mut ctx.data,
DataScope::QuestionPre => &mut ctx.cur_question_pre,
DataScope::QuestionContent => &mut ctx.cur_question,
};
scope_data[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
_ => (), //None or Ignore
};
// clear content
ctx.cur_content.clear();
}
None => {
// accumulate content if line is not a keyword
ctx.cur_content.push(line);
}
// prepare to read new question data with cur_question_pre values
ctx.cur_question = ctx.cur_question_pre.clone();
ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into();
ctx.have_new_question = true;
}
Some(QuestionContent) => {
ctx.cur_question[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
Some(CurrentScope) => {
// match value to store data
let scope_data = match ctx.cur_scope {
DataScope::Global => &mut ctx.data,
DataScope::QuestionPre => &mut ctx.cur_question_pre,
DataScope::QuestionContent => &mut ctx.cur_question,
};
scope_data[&ctx.last_tag] = ctx.cur_content.join("\n").into();
}
_ => (), //None or Ignore
};
// clear content
ctx.cur_content.clear();
}
});
None => {
// accumulate content if line is not a keyword
ctx.cur_content.push(line);
}
}
}
// finish reading last question
if ctx.have_new_question && !ctx.cur_content.is_empty() {
@@ -219,106 +221,61 @@ fn parse_file(file: impl io::Read) -> Result<json::JsonValue, Box<dyn std::error
Ok(ctx.data.clone())
}
// split vector to a vector of [num] slices
trait SplitTo<T> {
fn split_to(&self, num: usize) -> Vec<&[T]>;
async fn process_file(index: usize, name: String) -> Result<(), Box<dyn std::error::Error>> {
let archive = ZipFileReader::new(String::from(BASE_FILENAME)).await?;
let entry_reader = archive.entry_reader(index).await?;
// make output filename
let mut outfilename = PathBuf::from(OUTPUT_PATH);
outfilename.push(name);
outfilename.set_extension("json");
// save json to file
let new_data = parse_file(entry_reader).await?;
let data_str = new_data.pretty(2);
let mut outfile = fs::File::create(outfilename).await?;
outfile.write_all(data_str.as_bytes()).await?;
Ok(())
}
impl<T> SplitTo<T> for Vec<T> {
fn split_to(&self, num: usize) -> Vec<&[T]> {
let part_len = self.len() / num;
let add_len = self.len() % num;
let mut result = Vec::<&[T]>::with_capacity(num);
if 0 == part_len {
result.push(self);
return result;
}
for i in 0..num {
let size = if (num - 1) == i {
part_len + add_len
} else {
part_len
};
let start = part_len * i;
result.push(&self[start..(start + size)]);
}
result
}
}
fn process_files(files: &&[PathBuf]) {
if files.is_empty() {
return;
}
let start_file = files[0].to_str().unwrap();
println!("-> start from \"{}\" ({} files)", start_file, files.len());
let zip_file = fs::File::open(BASE_FILENAME).unwrap();
let zip_reader = io::BufReader::new(zip_file);
let mut archive = zip::ZipArchive::new(zip_reader).unwrap();
files.iter().for_each(|name| {
let name_str = name.to_str().unwrap();
// parse txt file
let file = archive.by_name(name_str).unwrap();
let data = parse_file(file).unwrap();
// make output filename
let mut outfilename = PathBuf::from(OUTPUT_PATH);
outfilename.push(name);
outfilename.set_extension("json");
// save json to file
let mut outfile = fs::File::create(outfilename).unwrap();
data.write_pretty(&mut outfile, 1).unwrap();
});
println!("<- done {} files (from \"{}\")", files.len(), start_file);
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// open archive just to list files
let zip_file = fs::File::open(BASE_FILENAME)?;
let zip_reader = io::BufReader::new(zip_file);
let mut archive = zip::ZipArchive::new(zip_reader)?;
let archive = ZipFileReader::new(String::from(BASE_FILENAME)).await?;
let source_files: Vec<PathBuf> = (0..archive.len())
.map(|i| archive.by_index(i).unwrap().mangled_name())
.filter(|name| {
let source_files: Vec<(usize, String)> = archive
.entries()
.iter()
.enumerate()
.filter(|item| !item.1.dir())
.filter(|item| {
// skip files without "txt" extension
match name.extension() {
Some(ext) => match ext.to_str() {
Some(ext_str) => ext_str.eq_ignore_ascii_case("txt"),
_ => false, // extension is not valid unicode or not txt
},
_ => false, // no extension in filename
}
item.1.name().ends_with(".txt")
})
.map(|item| (item.0, item.1.name().to_string()))
.collect();
drop(archive);
// check output directory
let out_dir: PathBuf = OUTPUT_PATH.into();
if out_dir.is_file() {
return Err("output directory is file!".into());
} else if !out_dir.exists() {
fs::create_dir_all(out_dir)?;
match fs::metadata(OUTPUT_PATH).await {
Err(_) => fs::create_dir_all(OUTPUT_PATH).await?,
Ok(x) if x.is_file() => return Err("output directory is file!".into()),
_ => (),
};
println!(
"processing {} files with {} threads...",
source_files.len(),
rayon::current_num_threads()
);
println!("processing {} files ...", source_files.len());
// split vector and process its parts in parallel
source_files
.split_to(rayon::current_num_threads())
.par_iter()
.for_each(process_files);
let mut handles = vec![];
for index in source_files {
let i = index.clone();
let handle = tokio::spawn(async move {
process_file(i.0, i.1).await.unwrap();
});
handles.push(handle);
}
future::join_all(handles).await;
println!("done");
Ok(())