multi-threading
This commit is contained in:
parent
2ee48f207a
commit
f7f2d91bac
199
src/main.rs
199
src/main.rs
@ -1,17 +1,15 @@
|
||||
extern crate encoding;
|
||||
extern crate textstream;
|
||||
extern crate zip;
|
||||
|
||||
use clap::{Parser, ValueEnum};
|
||||
use encoding::label::encoding_from_whatwg_label;
|
||||
use encoding::Encoding;
|
||||
use encoding::EncodingRef;
|
||||
use encoding::{DecoderTrap, EncoderTrap};
|
||||
use regex::Regex;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::{fs, io};
|
||||
use textstream::TextReader;
|
||||
use zip::write::FileOptions;
|
||||
use std::fs;
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use zip::ZipWriter;
|
||||
|
||||
/// transcode txt files in zip archieve
|
||||
@ -31,6 +29,10 @@ struct Cli {
|
||||
#[clap(arg_enum, short, long, default_value = "zstd")]
|
||||
compression: OutputFileCompression,
|
||||
|
||||
/// output compression level
|
||||
#[clap(arg_enum, short='l', long, value_parser = clap::value_parser!(i32).range(1..=9), default_value = "5")]
|
||||
compression_level: i32,
|
||||
|
||||
/// filename filter (regex)
|
||||
#[clap(short, long, default_value = r#".*\.txt$"#)]
|
||||
regex: String,
|
||||
@ -68,110 +70,109 @@ impl From<OutputFileCompression> for zip::CompressionMethod {
|
||||
}
|
||||
}
|
||||
|
||||
struct OutFileDescr<'a> {
|
||||
#[derive(Debug)]
|
||||
struct FileData {
|
||||
name: String,
|
||||
options: &'a FileOptions,
|
||||
encoding: &'a dyn Encoding,
|
||||
trap: EncoderTrap,
|
||||
data: Vec<u8>,
|
||||
}
|
||||
|
||||
fn read_file(file: impl Read, enc: &dyn Encoding) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let buf = io::BufReader::new(file);
|
||||
let mut reader = TextReader::new(buf, enc, DecoderTrap::Ignore);
|
||||
let mut result = String::new();
|
||||
reader.read_to_end(&mut result).or(Err("decode error"))?;
|
||||
|
||||
Ok(result)
|
||||
#[derive(Debug)]
|
||||
struct FileText {
|
||||
name: String,
|
||||
text: String,
|
||||
}
|
||||
|
||||
fn write_file<T: Seek + Write>(
|
||||
arc: &mut ZipWriter<T>,
|
||||
f: OutFileDescr,
|
||||
data: String,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
arc.start_file(f.name, *f.options)?;
|
||||
let data = f.encoding.encode(data.as_str(), f.trap)?;
|
||||
arc.write_all(&data)?;
|
||||
Ok(())
|
||||
}
|
||||
fn reader_task(tx: mpsc::Sender<FileData>, input_filename: String, regex: Regex) {
|
||||
let zip_file = fs::File::open(input_filename).unwrap();
|
||||
let mut archive = zip::ZipArchive::new(zip_file).unwrap();
|
||||
|
||||
fn process_files<R: Read + Seek>(
|
||||
archive: &mut zip::ZipArchive<R>,
|
||||
output_filename: String,
|
||||
enc_input: &(dyn Encoding + Send + Sync),
|
||||
enc_output: &(dyn Encoding + Send + Sync),
|
||||
compression: zip::CompressionMethod,
|
||||
files: &[PathBuf],
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if files.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let options = zip::write::FileOptions::default()
|
||||
.compression_method(compression)
|
||||
.compression_level(Some(9));
|
||||
let file_def = OutFileDescr {
|
||||
name: String::new(),
|
||||
options: &options,
|
||||
encoding: enc_output,
|
||||
trap: EncoderTrap::Ignore,
|
||||
};
|
||||
|
||||
let mut outfile = fs::File::create(output_filename)?;
|
||||
let mut zip_writer = ZipWriter::new(&mut outfile);
|
||||
|
||||
for name in files {
|
||||
let name_str = name.to_str().ok_or("name to str err")?;
|
||||
|
||||
// read string from file in input zip
|
||||
let file = archive.by_name(name_str)?;
|
||||
let data = read_file(file, enc_input)?;
|
||||
|
||||
// write string to file in output zip
|
||||
let out_file = OutFileDescr {
|
||||
name: name_str.to_string(),
|
||||
..file_def
|
||||
};
|
||||
write_file(&mut zip_writer, out_file, data)?;
|
||||
}
|
||||
|
||||
zip_writer.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Cli::parse();
|
||||
|
||||
let regex = Regex::new(&args.regex).unwrap();
|
||||
let encoding_input = encoding_from_whatwg_label(&args.from).expect("input encoding");
|
||||
let encoding_output = encoding_from_whatwg_label(&args.to).expect("output encoding");
|
||||
let compression: zip::CompressionMethod = args.compression.into();
|
||||
|
||||
// open archive just to list files
|
||||
let zip_file = fs::File::open(args.src)?;
|
||||
let zip_reader = io::BufReader::new(zip_file);
|
||||
let mut archive = zip::ZipArchive::new(zip_reader)?;
|
||||
|
||||
let mut source_files: Vec<PathBuf> = (0..archive.len())
|
||||
.map(|i| archive.by_index(i).unwrap().mangled_name())
|
||||
.filter(|name| regex.is_match(name.to_str().unwrap()))
|
||||
let mut source_files: Vec<String> = archive
|
||||
.file_names()
|
||||
.filter(|name| regex.is_match(name))
|
||||
.map(|s| s.to_string())
|
||||
.collect();
|
||||
|
||||
println!("processing {} files...", source_files.len());
|
||||
|
||||
source_files.sort();
|
||||
let source_files = source_files;
|
||||
|
||||
process_files(
|
||||
&mut archive,
|
||||
args.dst,
|
||||
encoding_input,
|
||||
encoding_output,
|
||||
compression,
|
||||
&source_files,
|
||||
)?;
|
||||
for name in source_files {
|
||||
let mut file = archive.by_name(name.as_str()).unwrap();
|
||||
let mut data = Vec::with_capacity(file.size().try_into().unwrap());
|
||||
file.read_to_end(&mut data).unwrap();
|
||||
drop(file);
|
||||
|
||||
println!("done");
|
||||
tx.send(FileData { name, data }).unwrap();
|
||||
}
|
||||
|
||||
println!("read done ✅");
|
||||
}
|
||||
|
||||
fn decoder_task(rx: mpsc::Receiver<FileData>, tx: mpsc::Sender<FileText>, encoding: EncodingRef) {
|
||||
while let Ok(FileData { name, data }) = rx.recv() {
|
||||
let text = encoding.decode(&data, DecoderTrap::Ignore).unwrap();
|
||||
tx.send(FileText { name, text }).unwrap();
|
||||
}
|
||||
println!("decode done ✅");
|
||||
}
|
||||
|
||||
fn encoder_task(rx: mpsc::Receiver<FileText>, tx: mpsc::Sender<FileData>, encoding: EncodingRef) {
|
||||
while let Ok(FileText { name, text }) = rx.recv() {
|
||||
let data = encoding.encode(text.as_str(), EncoderTrap::Ignore).unwrap();
|
||||
tx.send(FileData { name, data }).unwrap();
|
||||
}
|
||||
println!("encode done ✅");
|
||||
}
|
||||
|
||||
fn writer_task(
|
||||
rx: mpsc::Receiver<FileData>,
|
||||
output_filename: String,
|
||||
compression: zip::CompressionMethod,
|
||||
compression_level: i32,
|
||||
) {
|
||||
let options = zip::write::FileOptions::default()
|
||||
.compression_method(compression)
|
||||
.compression_level(Some(compression_level));
|
||||
|
||||
let mut outfile = fs::File::create(output_filename).expect("output file");
|
||||
let mut zip_writer = ZipWriter::new(&mut outfile);
|
||||
|
||||
while let Ok(FileData { name, data }) = rx.recv() {
|
||||
zip_writer.start_file(name, options).unwrap();
|
||||
zip_writer.write_all(&data).unwrap();
|
||||
}
|
||||
zip_writer.finish().unwrap();
|
||||
println!("write done ✅");
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Cli::parse();
|
||||
|
||||
let regex = Regex::new(&args.regex).expect("regex");
|
||||
let encoding_input = encoding_from_whatwg_label(&args.from).expect("input encoding");
|
||||
let encoding_output = encoding_from_whatwg_label(&args.to).expect("output encoding");
|
||||
let compression: zip::CompressionMethod = args.compression.into();
|
||||
let compression_level = args.compression_level;
|
||||
let input_filename = args.src;
|
||||
let output_filename = args.dst;
|
||||
|
||||
let (reader_tx, reader_rx) = mpsc::channel::<FileData>();
|
||||
let (decoder_tx, decoder_rx) = mpsc::channel::<FileText>();
|
||||
let (encoder_tx, encoder_rx) = mpsc::channel::<FileData>();
|
||||
|
||||
let handles = vec![
|
||||
thread::spawn(move || reader_task(reader_tx, input_filename, regex)),
|
||||
thread::spawn(move || decoder_task(reader_rx, decoder_tx, encoding_input)),
|
||||
thread::spawn(move || encoder_task(decoder_rx, encoder_tx, encoding_output)),
|
||||
thread::spawn(move || {
|
||||
writer_task(encoder_rx, output_filename, compression, compression_level)
|
||||
}),
|
||||
];
|
||||
|
||||
for handle in handles {
|
||||
handle.join().expect("thread paniced");
|
||||
}
|
||||
|
||||
println!("all done ✅");
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user