From f7f2d91bac3612b7ea27a9d77ae2e74d1dd120fd Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 21 Sep 2022 13:52:44 +0300 Subject: [PATCH] multi-threading --- src/main.rs | 199 ++++++++++++++++++++++++++-------------------------- 1 file changed, 100 insertions(+), 99 deletions(-) diff --git a/src/main.rs b/src/main.rs index 03e08b7..b239c14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,15 @@ extern crate encoding; -extern crate textstream; extern crate zip; use clap::{Parser, ValueEnum}; use encoding::label::encoding_from_whatwg_label; -use encoding::Encoding; +use encoding::EncodingRef; use encoding::{DecoderTrap, EncoderTrap}; use regex::Regex; -use std::io::{Read, Seek, Write}; -use std::path::PathBuf; -use std::{fs, io}; -use textstream::TextReader; -use zip::write::FileOptions; +use std::fs; +use std::io::{Read, Write}; +use std::sync::mpsc; +use std::thread; use zip::ZipWriter; /// transcode txt files in zip archieve @@ -31,6 +29,10 @@ struct Cli { #[clap(arg_enum, short, long, default_value = "zstd")] compression: OutputFileCompression, + /// output compression level + #[clap(arg_enum, short='l', long, value_parser = clap::value_parser!(i32).range(1..=9), default_value = "5")] + compression_level: i32, + /// filename filter (regex) #[clap(short, long, default_value = r#".*\.txt$"#)] regex: String, @@ -68,110 +70,109 @@ impl From for zip::CompressionMethod { } } -struct OutFileDescr<'a> { +#[derive(Debug)] +struct FileData { name: String, - options: &'a FileOptions, - encoding: &'a dyn Encoding, - trap: EncoderTrap, + data: Vec, } -fn read_file(file: impl Read, enc: &dyn Encoding) -> Result> { - let buf = io::BufReader::new(file); - let mut reader = TextReader::new(buf, enc, DecoderTrap::Ignore); - let mut result = String::new(); - reader.read_to_end(&mut result).or(Err("decode error"))?; - - Ok(result) +#[derive(Debug)] +struct FileText { + name: String, + text: String, } -fn write_file( - arc: &mut ZipWriter, - f: OutFileDescr, - data: String, -) -> Result<(), Box> { - arc.start_file(f.name, *f.options)?; - let data = f.encoding.encode(data.as_str(), f.trap)?; - arc.write_all(&data)?; - Ok(()) -} +fn reader_task(tx: mpsc::Sender, input_filename: String, regex: Regex) { + let zip_file = fs::File::open(input_filename).unwrap(); + let mut archive = zip::ZipArchive::new(zip_file).unwrap(); -fn process_files( - archive: &mut zip::ZipArchive, - output_filename: String, - enc_input: &(dyn Encoding + Send + Sync), - enc_output: &(dyn Encoding + Send + Sync), - compression: zip::CompressionMethod, - files: &[PathBuf], -) -> Result<(), Box> { - if files.is_empty() { - return Ok(()); - } - - let options = zip::write::FileOptions::default() - .compression_method(compression) - .compression_level(Some(9)); - let file_def = OutFileDescr { - name: String::new(), - options: &options, - encoding: enc_output, - trap: EncoderTrap::Ignore, - }; - - let mut outfile = fs::File::create(output_filename)?; - let mut zip_writer = ZipWriter::new(&mut outfile); - - for name in files { - let name_str = name.to_str().ok_or("name to str err")?; - - // read string from file in input zip - let file = archive.by_name(name_str)?; - let data = read_file(file, enc_input)?; - - // write string to file in output zip - let out_file = OutFileDescr { - name: name_str.to_string(), - ..file_def - }; - write_file(&mut zip_writer, out_file, data)?; - } - - zip_writer.finish()?; - - Ok(()) -} - -fn main() -> Result<(), Box> { - let args = Cli::parse(); - - let regex = Regex::new(&args.regex).unwrap(); - let encoding_input = encoding_from_whatwg_label(&args.from).expect("input encoding"); - let encoding_output = encoding_from_whatwg_label(&args.to).expect("output encoding"); - let compression: zip::CompressionMethod = args.compression.into(); - - // open archive just to list files - let zip_file = fs::File::open(args.src)?; - let zip_reader = io::BufReader::new(zip_file); - let mut archive = zip::ZipArchive::new(zip_reader)?; - - let mut source_files: Vec = (0..archive.len()) - .map(|i| archive.by_index(i).unwrap().mangled_name()) - .filter(|name| regex.is_match(name.to_str().unwrap())) + let mut source_files: Vec = archive + .file_names() + .filter(|name| regex.is_match(name)) + .map(|s| s.to_string()) .collect(); println!("processing {} files...", source_files.len()); source_files.sort(); - let source_files = source_files; - process_files( - &mut archive, - args.dst, - encoding_input, - encoding_output, - compression, - &source_files, - )?; + for name in source_files { + let mut file = archive.by_name(name.as_str()).unwrap(); + let mut data = Vec::with_capacity(file.size().try_into().unwrap()); + file.read_to_end(&mut data).unwrap(); + drop(file); - println!("done"); + tx.send(FileData { name, data }).unwrap(); + } + + println!("read done ✅"); +} + +fn decoder_task(rx: mpsc::Receiver, tx: mpsc::Sender, encoding: EncodingRef) { + while let Ok(FileData { name, data }) = rx.recv() { + let text = encoding.decode(&data, DecoderTrap::Ignore).unwrap(); + tx.send(FileText { name, text }).unwrap(); + } + println!("decode done ✅"); +} + +fn encoder_task(rx: mpsc::Receiver, tx: mpsc::Sender, encoding: EncodingRef) { + while let Ok(FileText { name, text }) = rx.recv() { + let data = encoding.encode(text.as_str(), EncoderTrap::Ignore).unwrap(); + tx.send(FileData { name, data }).unwrap(); + } + println!("encode done ✅"); +} + +fn writer_task( + rx: mpsc::Receiver, + output_filename: String, + compression: zip::CompressionMethod, + compression_level: i32, +) { + let options = zip::write::FileOptions::default() + .compression_method(compression) + .compression_level(Some(compression_level)); + + let mut outfile = fs::File::create(output_filename).expect("output file"); + let mut zip_writer = ZipWriter::new(&mut outfile); + + while let Ok(FileData { name, data }) = rx.recv() { + zip_writer.start_file(name, options).unwrap(); + zip_writer.write_all(&data).unwrap(); + } + zip_writer.finish().unwrap(); + println!("write done ✅"); +} + +fn main() -> Result<(), Box> { + let args = Cli::parse(); + + let regex = Regex::new(&args.regex).expect("regex"); + let encoding_input = encoding_from_whatwg_label(&args.from).expect("input encoding"); + let encoding_output = encoding_from_whatwg_label(&args.to).expect("output encoding"); + let compression: zip::CompressionMethod = args.compression.into(); + let compression_level = args.compression_level; + let input_filename = args.src; + let output_filename = args.dst; + + let (reader_tx, reader_rx) = mpsc::channel::(); + let (decoder_tx, decoder_rx) = mpsc::channel::(); + let (encoder_tx, encoder_rx) = mpsc::channel::(); + + let handles = vec![ + thread::spawn(move || reader_task(reader_tx, input_filename, regex)), + thread::spawn(move || decoder_task(reader_rx, decoder_tx, encoding_input)), + thread::spawn(move || encoder_task(decoder_rx, encoder_tx, encoding_output)), + thread::spawn(move || { + writer_task(encoder_rx, output_filename, compression, compression_level) + }), + ]; + + for handle in handles { + handle.join().expect("thread paniced"); + } + + println!("all done ✅"); Ok(()) }