parallel processing using rayon

This commit is contained in:
2019-07-26 23:04:41 +03:00
parent 390097b5b8
commit 0240bf2fdf
3 changed files with 217 additions and 20 deletions

View File

@@ -1,15 +1,19 @@
extern crate encoding;
extern crate json;
extern crate rayon;
extern crate textstream;
extern crate zip;
use encoding::all::KOI8_R;
use encoding::DecoderTrap;
use rayon::prelude::*;
use std::fs;
use std::io;
use std::path::PathBuf;
use textstream::TextReader;
const BASE_FILENAME: &str = "./baza.zip";
#[derive(Debug, Clone, Copy)]
enum KeywordType {
Ignore,
@@ -192,33 +196,82 @@ fn parse_file(file: impl io::Read) -> Result<json::JsonValue, Box<std::error::Er
Ok(ctx.data.clone())
}
fn main() -> Result<(), Box<std::error::Error>> {
let fname = std::path::Path::new("./baza.zip");
let zip_file = fs::File::open(&fname)?;
// split vector to a vector of slices
#[allow(clippy::ptr_arg)]
fn split_vec<'a, T>(src: &'a Vec<T>, num: usize) -> Vec<&'a [T]> {
let all_len = src.len();
let part_len = all_len / num;
let add_len = all_len % num;
let mut result = Vec::<&'a [T]>::new();
if 1 > part_len {
result.push(src.as_slice());
return result;
}
for i in 0..num {
let size = if (num - 1) == i {
part_len + add_len
} else {
part_len
};
let start = part_len * i;
result.push(&src[start..(start + size)]);
}
result
}
fn process_files(files: Vec<PathBuf>) {
let zip_file = fs::File::open(BASE_FILENAME).unwrap();
let zip_reader = io::BufReader::new(zip_file);
let mut archive = zip::ZipArchive::new(zip_reader).unwrap();
let mut archive = zip::ZipArchive::new(zip_reader)?;
files.iter().for_each(|name| {
let name_str = name.to_str().unwrap();
println!("{:}", name_str);
for i in 0..archive.len() {
let file = archive.by_index(i)?;
let name = file.sanitized_name();
// skip files without "txt" extension
match name.extension() {
Some(ext) => match ext.to_str() {
Some(ext_str) if ext_str.eq_ignore_ascii_case("txt") => (),
_ => continue, // extension is not valid unicode or not txt
},
_ => continue, // no extension in filename
}
println!("{}", name.as_path().display());
let data: json::JsonValue = parse_file(file)?;
// parse txt file
let file = archive.by_name(name_str).unwrap();
let data = parse_file(file).unwrap();
// make output filename
let mut outfilename = PathBuf::from("./json");
outfilename.push(name);
outfilename.set_extension("json");
let mut outfile = fs::File::create(outfilename)?;
// save json to file
let mut outfile = fs::File::create(outfilename).unwrap();
data.write_pretty(&mut outfile, 1).unwrap();
});
}
fn main() -> Result<(), Box<std::error::Error>> {
// open archive just to list files
let zip_file = fs::File::open(BASE_FILENAME)?;
let zip_reader = io::BufReader::new(zip_file);
let mut archive = zip::ZipArchive::new(zip_reader)?;
let source_files: Vec<PathBuf> = (0..archive.len())
.map(|i| archive.by_index(i).unwrap().sanitized_name())
.filter(|name| {
// skip files without "txt" extension
match name.extension() {
Some(ext) => match ext.to_str() {
Some(ext_str) if ext_str.eq_ignore_ascii_case("txt") => true,
_ => false, // extension is not valid unicode or not txt
},
_ => false, // no extension in filename
}
})
.collect();
drop(archive);
//split vector and iterate on it parts in parallel
split_vec(&source_files, rayon::current_num_threads())
.par_iter()
.for_each(|slice| {
let source_files_part = slice.to_vec();
process_files(source_files_part);
});
data.write_pretty(&mut outfile, 1)?;
}
Ok(())
}