diff --git a/src/main.rs b/src/main.rs index 2604ada..323b6de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -239,10 +239,13 @@ type LSize = u32; const LEN_SIZE: usize = std::mem::size_of::(); fn read_from_db2(id: u32) -> Option { + const INPUT_FILENAME: &str = "test.bin"; + const INPUT_BUF_SIZE: usize = 4 * 1024; + let cfg = bincode::config::standard(); - let input = fs::File::open("test.bin").expect("open input"); - let mut input = io::BufReader::with_capacity(4 * 1024, input); + let input = fs::File::open(INPUT_FILENAME).expect("open input"); + let mut input = io::BufReader::with_capacity(INPUT_BUF_SIZE, input); let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; input.read_exact(&mut first_data).expect("read first"); @@ -250,13 +253,6 @@ fn read_from_db2(id: u32) -> Option { let tab_len = (first_pos as usize) / LEN_SIZE; let records_count = tab_len - 1; - // println!( - // "read tab_len done: {}, pos {}, first_pos {}", - // tab_len, - // input.stream_position().unwrap(), - // first_pos - // ); - let index = match id { 0 => { let mut rng = rand::thread_rng(); @@ -265,8 +261,9 @@ fn read_from_db2(id: u32) -> Option { _ => (id - 1) as usize, }; - // println!("index {}", index); - assert!(index < records_count); + if index >= records_count { + return None; + } let data_pos = if 0 == index { first_pos @@ -288,27 +285,16 @@ fn read_from_db2(id: u32) -> Option { let data_len = data_pos_next - data_pos; let tab_pos = input.stream_position().expect("stream pos") as u32; - - // println!( - // "pos {} | next {} | len {} | tab_pos {}", - // data_pos, data_pos_next, data_len, tab_pos - // ); let advance_len = data_pos - tab_pos as LSize; - // println!("advance_len {}", advance_len); - input.seek_relative(advance_len as i64).expect("q seek"); let reader = input.take(data_len as u64); let data = zstd::decode_all(reader).expect("zstd decode data"); - // println!("zstd decoded len {}", data.len()); - let question: (Question, usize) = bincode::decode_from_slice(&data, cfg).expect("bincode decode q"); let question = question.0; - // println!("read done"); - Some(question) } fn write_db2() { @@ -323,18 +309,23 @@ fn write_db2() { } fn db_writer2_task(rx: mpsc::Receiver) { const COMP_DATA_LEVEL: i32 = 1; + const DATA_BUF_SIZE: usize = 500 * 1024 * 1024; + const OUT_BUF_SIZE: usize = 200 * 1024 * 1024; + const CURRENT_BUF_SIZE: usize = 100 * 1024; + const OUTPUT_FILENAME: &str = "test.bin"; let cfg = bincode::config::standard(); let mut table: Vec = vec![]; let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder"); - let data_buf: Vec = Vec::with_capacity(500 * 1024 * 1024); - let mut data_buf = Cursor::new(data_buf); + let data_buf: Vec = Vec::with_capacity(DATA_BUF_SIZE); + let cur_buf_raw: Vec = Vec::with_capacity(CURRENT_BUF_SIZE); - let cur_buf_raw: Vec = Vec::with_capacity(100 * 1024); + let mut data_buf = Cursor::new(data_buf); let cur_buf_raw = Cursor::new(cur_buf_raw); let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder); + let mut num = 1; let mut pos: LSize = 0; rx.into_iter().for_each(|mut q| { @@ -353,25 +344,16 @@ fn db_writer2_task(rx: mpsc::Receiver) { table.push(pos); let cur_buf_raw = cur_buf_z.writer_mut(); - cur_buf_raw.seek(io::SeekFrom::Start(0)).expect("seek zbuf"); + cur_buf_raw.set_position(0); io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed"); pos = data_buf.stream_position().expect("data buf pos") as LSize; - //println!("write [{}]: {}", num, pos); num += 1; }); table.push(pos); - println!("write [{}]: {}", num, pos); - drop(cur_buf_z); - - println!( - "zbuf done, tab len {}, buf size {}", - table.len(), - data_buf.position() - ); - data_buf.set_position(0); + drop(cur_buf_z); let tab_data = vec![0u8; table.len() * LEN_SIZE]; let mut tab_cursor = Cursor::new(tab_data); @@ -380,16 +362,11 @@ fn db_writer2_task(rx: mpsc::Receiver) { let pos_data = (pos + tab_size).to_le_bytes(); tab_cursor.write_all(&pos_data).expect("write pos"); } - - println!("tab buf done, len = {}", tab_cursor.position()); tab_cursor.set_position(0); - let out = fs::File::create("test.bin").expect("out create"); - let mut out = io::BufWriter::with_capacity(500 * 1024 * 1024, out); + let out = fs::File::create(OUTPUT_FILENAME).expect("out create"); + let mut out = io::BufWriter::with_capacity(OUT_BUF_SIZE, out); io::copy(&mut tab_cursor, &mut out).expect("write tab"); - - println!("header write done, pos: {}", out.stream_position().unwrap()); - io::copy(&mut data_buf, &mut out).expect("copy z buf"); drop(data_buf); out.flush().expect("out flush");