From 8ad88490e72bb2749e2d68952edf9aa1a350a6a1 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sat, 8 Oct 2022 02:02:27 +0300 Subject: [PATCH] optimize zstd writer --- bench.txt | 2 +- src/main.rs | 58 +++++++++++++++++++++++++++++++++++------------------ 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/bench.txt b/bench.txt index d654114..af84e66 100644 --- a/bench.txt +++ b/bench.txt @@ -1,5 +1,5 @@ 866 MB db/data.mdb -233 MB test.bin +149 MB test.bin 95 MB json.zip --- diff --git a/src/main.rs b/src/main.rs index 120c1a4..2604ada 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,10 +9,12 @@ extern crate zip; use clap::{Parser, Subcommand}; use rand::seq::IteratorRandom; -use std::io::{Read, Seek, Write}; +use std::io::{self, Cursor, Read, Seek, Write}; use std::path::PathBuf; use std::time::Instant; -use std::{fs, io, sync::mpsc, thread}; +use std::{fs, sync::mpsc, thread}; + +use zstd::stream::raw::Operation; use ledb::{Options, Storage}; @@ -240,7 +242,7 @@ fn read_from_db2(id: u32) -> Option { let cfg = bincode::config::standard(); let input = fs::File::open("test.bin").expect("open input"); - let mut input = std::io::BufReader::with_capacity(4 * 1024, input); + let mut input = io::BufReader::with_capacity(4 * 1024, input); let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; input.read_exact(&mut first_data).expect("read first"); @@ -320,38 +322,56 @@ fn write_db2() { println!("all done"); } fn db_writer2_task(rx: mpsc::Receiver) { - const COMP_DATA_LEVEL: i32 = 2; + const COMP_DATA_LEVEL: i32 = 1; let cfg = bincode::config::standard(); let mut table: Vec = vec![]; - let buf_data: Vec = Vec::with_capacity(500 * 1024 * 1024); - use std::io::Cursor; - let mut buf = Cursor::new(buf_data); - let mut pos: LSize = 0; + let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder"); + + let data_buf: Vec = Vec::with_capacity(500 * 1024 * 1024); + let mut data_buf = Cursor::new(data_buf); + + let cur_buf_raw: Vec = Vec::with_capacity(100 * 1024); + let cur_buf_raw = Cursor::new(cur_buf_raw); + let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder); let mut num = 1; + let mut pos: LSize = 0; rx.into_iter().for_each(|mut q| { q.num = num; - let data = bincode::encode_to_vec(q, cfg).expect("bincode q encode"); - let data = zstd::encode_all(Cursor::new(data), COMP_DATA_LEVEL).expect("zstd q encode"); - let len = buf.write(&data).expect("write question"); - table.push(pos); + let q_data = bincode::encode_to_vec(q, cfg).expect("bincode q encode"); + let zencoder = cur_buf_z.operation_mut(); + zencoder.reinit().expect("zstd encoder reinit"); + zencoder + .set_pledged_src_size(q_data.len() as u64) + .expect("zstd op set len"); + + cur_buf_z.write_all(&q_data).expect("write question"); + cur_buf_z.finish().expect("zstd q finish"); + cur_buf_z.flush().expect("zbuf flush"); + + table.push(pos); + let cur_buf_raw = cur_buf_z.writer_mut(); + cur_buf_raw.seek(io::SeekFrom::Start(0)).expect("seek zbuf"); + io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed"); + pos = data_buf.stream_position().expect("data buf pos") as LSize; //println!("write [{}]: {}", num, pos); - pos += len as LSize; num += 1; }); table.push(pos); + println!("write [{}]: {}", num, pos); + drop(cur_buf_z); println!( "zbuf done, tab len {}, buf size {}", table.len(), - buf.position() + data_buf.position() ); - buf.set_position(0); + data_buf.set_position(0); let tab_data = vec![0u8; table.len() * LEN_SIZE]; let mut tab_cursor = Cursor::new(tab_data); @@ -365,13 +385,13 @@ fn db_writer2_task(rx: mpsc::Receiver) { tab_cursor.set_position(0); let out = fs::File::create("test.bin").expect("out create"); - let mut out = std::io::BufWriter::with_capacity(500 * 1024 * 1024, out); - std::io::copy(&mut tab_cursor, &mut out).expect("write tab"); + let mut out = io::BufWriter::with_capacity(500 * 1024 * 1024, out); + io::copy(&mut tab_cursor, &mut out).expect("write tab"); println!("header write done, pos: {}", out.stream_position().unwrap()); - std::io::copy(&mut buf, &mut out).expect("copy z buf"); - drop(buf); + io::copy(&mut data_buf, &mut out).expect("copy z buf"); + drop(data_buf); out.flush().expect("out flush"); println!("write done");