optimize zstd writer
This commit is contained in:
parent
73e8b34af4
commit
8ad88490e7
@ -1,5 +1,5 @@
|
|||||||
866 MB db/data.mdb
|
866 MB db/data.mdb
|
||||||
233 MB test.bin
|
149 MB test.bin
|
||||||
95 MB json.zip
|
95 MB json.zip
|
||||||
---
|
---
|
||||||
|
|
||||||
|
58
src/main.rs
58
src/main.rs
@ -9,10 +9,12 @@ extern crate zip;
|
|||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use rand::seq::IteratorRandom;
|
use rand::seq::IteratorRandom;
|
||||||
|
|
||||||
use std::io::{Read, Seek, Write};
|
use std::io::{self, Cursor, Read, Seek, Write};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use std::{fs, io, sync::mpsc, thread};
|
use std::{fs, sync::mpsc, thread};
|
||||||
|
|
||||||
|
use zstd::stream::raw::Operation;
|
||||||
|
|
||||||
use ledb::{Options, Storage};
|
use ledb::{Options, Storage};
|
||||||
|
|
||||||
@ -240,7 +242,7 @@ fn read_from_db2(id: u32) -> Option<Question> {
|
|||||||
let cfg = bincode::config::standard();
|
let cfg = bincode::config::standard();
|
||||||
|
|
||||||
let input = fs::File::open("test.bin").expect("open input");
|
let input = fs::File::open("test.bin").expect("open input");
|
||||||
let mut input = std::io::BufReader::with_capacity(4 * 1024, input);
|
let mut input = io::BufReader::with_capacity(4 * 1024, input);
|
||||||
|
|
||||||
let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
|
let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
|
||||||
input.read_exact(&mut first_data).expect("read first");
|
input.read_exact(&mut first_data).expect("read first");
|
||||||
@ -320,38 +322,56 @@ fn write_db2() {
|
|||||||
println!("all done");
|
println!("all done");
|
||||||
}
|
}
|
||||||
fn db_writer2_task(rx: mpsc::Receiver<Question>) {
|
fn db_writer2_task(rx: mpsc::Receiver<Question>) {
|
||||||
const COMP_DATA_LEVEL: i32 = 2;
|
const COMP_DATA_LEVEL: i32 = 1;
|
||||||
|
|
||||||
let cfg = bincode::config::standard();
|
let cfg = bincode::config::standard();
|
||||||
let mut table: Vec<LSize> = vec![];
|
let mut table: Vec<LSize> = vec![];
|
||||||
|
|
||||||
let buf_data: Vec<u8> = Vec::with_capacity(500 * 1024 * 1024);
|
let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder");
|
||||||
use std::io::Cursor;
|
|
||||||
let mut buf = Cursor::new(buf_data);
|
let data_buf: Vec<u8> = Vec::with_capacity(500 * 1024 * 1024);
|
||||||
let mut pos: LSize = 0;
|
let mut data_buf = Cursor::new(data_buf);
|
||||||
|
|
||||||
|
let cur_buf_raw: Vec<u8> = Vec::with_capacity(100 * 1024);
|
||||||
|
let cur_buf_raw = Cursor::new(cur_buf_raw);
|
||||||
|
let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder);
|
||||||
let mut num = 1;
|
let mut num = 1;
|
||||||
|
let mut pos: LSize = 0;
|
||||||
rx.into_iter().for_each(|mut q| {
|
rx.into_iter().for_each(|mut q| {
|
||||||
q.num = num;
|
q.num = num;
|
||||||
let data = bincode::encode_to_vec(q, cfg).expect("bincode q encode");
|
let q_data = bincode::encode_to_vec(q, cfg).expect("bincode q encode");
|
||||||
let data = zstd::encode_all(Cursor::new(data), COMP_DATA_LEVEL).expect("zstd q encode");
|
|
||||||
let len = buf.write(&data).expect("write question");
|
|
||||||
table.push(pos);
|
|
||||||
|
|
||||||
|
let zencoder = cur_buf_z.operation_mut();
|
||||||
|
zencoder.reinit().expect("zstd encoder reinit");
|
||||||
|
zencoder
|
||||||
|
.set_pledged_src_size(q_data.len() as u64)
|
||||||
|
.expect("zstd op set len");
|
||||||
|
|
||||||
|
cur_buf_z.write_all(&q_data).expect("write question");
|
||||||
|
cur_buf_z.finish().expect("zstd q finish");
|
||||||
|
cur_buf_z.flush().expect("zbuf flush");
|
||||||
|
|
||||||
|
table.push(pos);
|
||||||
|
let cur_buf_raw = cur_buf_z.writer_mut();
|
||||||
|
cur_buf_raw.seek(io::SeekFrom::Start(0)).expect("seek zbuf");
|
||||||
|
io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed");
|
||||||
|
pos = data_buf.stream_position().expect("data buf pos") as LSize;
|
||||||
//println!("write [{}]: {}", num, pos);
|
//println!("write [{}]: {}", num, pos);
|
||||||
|
|
||||||
pos += len as LSize;
|
|
||||||
num += 1;
|
num += 1;
|
||||||
});
|
});
|
||||||
table.push(pos);
|
table.push(pos);
|
||||||
|
|
||||||
println!("write [{}]: {}", num, pos);
|
println!("write [{}]: {}", num, pos);
|
||||||
|
drop(cur_buf_z);
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"zbuf done, tab len {}, buf size {}",
|
"zbuf done, tab len {}, buf size {}",
|
||||||
table.len(),
|
table.len(),
|
||||||
buf.position()
|
data_buf.position()
|
||||||
);
|
);
|
||||||
|
|
||||||
buf.set_position(0);
|
data_buf.set_position(0);
|
||||||
|
|
||||||
let tab_data = vec![0u8; table.len() * LEN_SIZE];
|
let tab_data = vec![0u8; table.len() * LEN_SIZE];
|
||||||
let mut tab_cursor = Cursor::new(tab_data);
|
let mut tab_cursor = Cursor::new(tab_data);
|
||||||
@ -365,13 +385,13 @@ fn db_writer2_task(rx: mpsc::Receiver<Question>) {
|
|||||||
tab_cursor.set_position(0);
|
tab_cursor.set_position(0);
|
||||||
|
|
||||||
let out = fs::File::create("test.bin").expect("out create");
|
let out = fs::File::create("test.bin").expect("out create");
|
||||||
let mut out = std::io::BufWriter::with_capacity(500 * 1024 * 1024, out);
|
let mut out = io::BufWriter::with_capacity(500 * 1024 * 1024, out);
|
||||||
std::io::copy(&mut tab_cursor, &mut out).expect("write tab");
|
io::copy(&mut tab_cursor, &mut out).expect("write tab");
|
||||||
|
|
||||||
println!("header write done, pos: {}", out.stream_position().unwrap());
|
println!("header write done, pos: {}", out.stream_position().unwrap());
|
||||||
|
|
||||||
std::io::copy(&mut buf, &mut out).expect("copy z buf");
|
io::copy(&mut data_buf, &mut out).expect("copy z buf");
|
||||||
drop(buf);
|
drop(data_buf);
|
||||||
out.flush().expect("out flush");
|
out.flush().expect("out flush");
|
||||||
|
|
||||||
println!("write done");
|
println!("write done");
|
||||||
|
Loading…
Reference in New Issue
Block a user