extern crate async_zip; extern crate encoding; extern crate tokio; use async_zip::read::fs::ZipFileReader; use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::Compression; use clap::{Parser, ValueEnum}; use encoding::label::encoding_from_whatwg_label; use encoding::{all::UTF_8, Encoding, EncodingRef}; use encoding::{DecoderTrap, EncoderTrap}; use regex::Regex; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::{fs, task}; /// transcode txt files in zip archive #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] struct Cli { /// input encoding #[clap(short, long, default_value = "koi8-r")] from: String, /// output encoding #[clap(short, long, default_value = "utf8")] to: String, /// output compression method #[clap(arg_enum, short, long, default_value = "zstd")] compression: OutputFileCompression, /// filename filter (regex) #[clap(short, long, default_value = r#".*\.txt$"#)] regex: String, /// input zip filename #[clap(value_parser, default_value = "baza.zip")] src: String, /// output zip filename #[clap(value_parser, default_value = "baza_utf8.zip")] dst: String, } /// output file compression method #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] enum OutputFileCompression { /// Store the file as is Store, /// Compress the file using Deflate Deflate, /// Compress the file using BZIP2 Bzip2, /// Compress the file using LZMA Lzma, /// Compress the file using ZStandard Zstd, /// Compress the file using XZ Xz, } impl From for Compression { fn from(compression: OutputFileCompression) -> Self { match compression { OutputFileCompression::Store => Self::Stored, OutputFileCompression::Deflate => Self::Deflate, OutputFileCompression::Bzip2 => Self::Bz, OutputFileCompression::Lzma => Self::Lzma, OutputFileCompression::Zstd => Self::Zstd, OutputFileCompression::Xz => Self::Xz, } } } #[derive(Debug)] struct FileData { name: String, data: Vec, } async fn reader_task(tx: UnboundedSender, input_filename: String, regex: Regex) { let archive = ZipFileReader::new(input_filename).await.unwrap(); let mut source_files: Vec<(usize, String, u32)> = archive .entries() .iter() .enumerate() .filter(|(_, entry)| !entry.dir()) .filter(|(_, entry)| regex.is_match(entry.name())) .map(|(index, entry)| { ( index, entry.name().to_string(), entry.uncompressed_size().unwrap(), ) }) .collect(); source_files.sort_by(|(_, name_a, _), (_, name_b, _)| name_a.partial_cmp(name_b).unwrap()); let mut count: usize = 0; for (index, name, uncompressed_size) in source_files { let mut entry_reader = archive.entry_reader(index).await.unwrap(); let mut data = Vec::with_capacity(uncompressed_size.try_into().unwrap()); entry_reader.read_to_end(&mut data).await.unwrap(); drop(entry_reader); tx.send(FileData { name, data }).unwrap(); count += 1; } println!("read {count} files done ✅"); } async fn transcoder_task( mut rx: UnboundedReceiver, tx: UnboundedSender, encoding_from: EncodingRef, encoding_to: EncodingRef, ) { let is_encodings_same = encoding_from.name() == encoding_to.name(); let is_src_encoding_native = UTF_8.name() == encoding_from.name(); let is_dst_encoding_native = UTF_8.name() == encoding_to.name(); while let Some(FileData { name, data }) = rx.recv().await { let new_data = if is_encodings_same { data } else { task::block_in_place(move || { let text = if is_src_encoding_native { String::from_utf8(data).unwrap() } else { encoding_from.decode(&data, DecoderTrap::Ignore).unwrap() }; if is_dst_encoding_native { text.into_bytes() } else { encoding_to .encode(text.as_str(), EncoderTrap::Ignore) .unwrap() } }) }; tx.send(FileData { name, data: new_data, }) .unwrap(); } println!("transcode done ✅"); } async fn writer_task( mut rx: UnboundedReceiver, output_filename: String, compression: Compression, ) { let outfile = fs::File::create(output_filename) .await .expect("output file"); let mut buf = BufWriter::with_capacity(100 * 1024 * 1024, outfile); let mut writer = ZipFileWriter::new(&mut buf); while let Some(FileData { name, data }) = rx.recv().await { let opts = EntryOptions::new(name, compression); writer.write_entry_whole(opts, &data).await.unwrap(); } writer.close().await.unwrap(); buf.flush().await.unwrap(); println!("write done ✅"); } #[tokio::main] async fn main() -> Result<(), Box> { let args = Cli::parse(); let regex = Regex::new(&args.regex).expect("regex"); let encoding_from = encoding_from_whatwg_label(&args.from).expect("input encoding"); let encoding_to = encoding_from_whatwg_label(&args.to).expect("output encoding"); let compression: Compression = args.compression.into(); let input_filename = args.src; let output_filename = args.dst; let (reader_tx, reader_rx) = mpsc::unbounded_channel::(); let (transcoder_tx, transcoder_rx) = mpsc::unbounded_channel::(); tokio::try_join!( tokio::spawn(reader_task(reader_tx, input_filename, regex)), tokio::spawn(transcoder_task( reader_rx, transcoder_tx, encoding_from, encoding_to )), tokio::spawn(writer_task(transcoder_rx, output_filename, compression)) )?; println!("all done ✅"); Ok(()) }