From 3b24a1be2f78e40ffdaec7f3c28a908c529bf212 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Sun, 9 Oct 2022 16:43:17 +0300 Subject: [PATCH] separate new db reader+writer from main --- src/db.rs | 200 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 120 ++++--------------------------- 2 files changed, 215 insertions(+), 105 deletions(-) create mode 100644 src/db.rs diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..17ddf25 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,200 @@ +use std::{ + fs, + io::{self, Cursor, Read, Seek, SeekFrom, Write}, + marker::PhantomData, + path::Path, +}; +use zstd::stream::{raw::Operation, zio}; + +type LSize = u32; +const LEN_SIZE: usize = std::mem::size_of::(); +const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard(); + +trait ErrorToString { + type Output; + fn str_err(self) -> std::result::Result; +} + +impl ErrorToString for std::result::Result +where + E: std::error::Error, +{ + type Output = T; + fn str_err(self) -> std::result::Result { + self.map_err(|e| e.to_string()) + } +} + +pub struct WriterOpts { + pub compress_lvl: i32, + pub data_buf_size: usize, + pub out_buf_size: usize, + pub current_buf_size: usize, +} + +impl Default for WriterOpts { + fn default() -> Self { + Self { + compress_lvl: 1, + data_buf_size: 500 * 1024 * 1024, + out_buf_size: 200 * 1024 * 1024, + current_buf_size: 20 * 1024, + } + } +} + +pub struct Writer<'a, T> +where + T: bincode::Encode, +{ + out: io::BufWriter, + data_buf: Cursor>, + cur_buf_z: zio::Writer>, zstd::stream::raw::Encoder<'a>>, + table: Vec, + _t: PhantomData<*const T>, +} + +impl Writer<'_, T> +where + T: bincode::Encode, +{ + pub fn new>(path: P, opts: WriterOpts) -> Result { + let out = fs::File::create(path).str_err()?; + let out = io::BufWriter::with_capacity(opts.out_buf_size, out); + let data_buf: Vec = Vec::with_capacity(opts.data_buf_size); + let cur_buf_raw: Vec = Vec::with_capacity(opts.current_buf_size); + let data_buf = Cursor::new(data_buf); + let cur_buf_raw = Cursor::new(cur_buf_raw); + + let zencoder = zstd::stream::raw::Encoder::new(opts.compress_lvl).str_err()?; + let cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder); + + let table: Vec = vec![]; + + Ok(Self { + out, + data_buf, + cur_buf_z, + table, + _t: PhantomData, + }) + } + pub fn push(&mut self, item: T) -> Result<(), String> { + let pos: LSize = self.data_buf.position() as LSize; + + let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?; + + let zencoder = self.cur_buf_z.operation_mut(); + zencoder.reinit().str_err()?; + zencoder + .set_pledged_src_size(item_data.len() as u64) + .str_err()?; + + self.cur_buf_z.write_all(&item_data).str_err()?; + self.cur_buf_z.finish().str_err()?; + self.cur_buf_z.flush().str_err()?; + + self.table.push(pos); + let cur_buf_raw = self.cur_buf_z.writer_mut(); + let size = cur_buf_raw.position(); + cur_buf_raw.set_position(0); + io::copy(&mut cur_buf_raw.take(size), &mut self.data_buf).str_err()?; + + Ok(()) + } + pub fn finish(mut self) -> Result<(), String> { + // finish tab + let pos: LSize = self.data_buf.position() as LSize; + self.table.push(pos); + + // write tab + let tab_size = (self.table.len() * LEN_SIZE) as LSize; + for pos in self.table { + let pos_data = (pos + tab_size).to_le_bytes(); + self.out.write_all(&pos_data).str_err()?; + } + + // copy data + let data_size = self.data_buf.position(); + self.data_buf.set_position(0); + let mut data = self.data_buf.take(data_size); + io::copy(&mut data, &mut self.out).str_err()?; + + self.out.flush().str_err()?; + Ok(()) + } +} + +pub struct Reader +where + T: bincode::Decode, +{ + input: io::BufReader, + count: usize, + first_pos: LSize, + _t: PhantomData<*const T>, +} + +impl Reader +where + T: bincode::Decode, +{ + pub fn new>(path: P, buf_size: usize) -> Result { + let input = fs::File::open(path).str_err()?; + let mut input = io::BufReader::with_capacity(buf_size, input); + + // read first pos and records count + let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; + input.read_exact(&mut first_data).str_err()?; + let first_pos = LSize::from_le_bytes(first_data); + let tab_len = (first_pos as usize) / LEN_SIZE; + let count = tab_len - 1; + + Ok(Self { + input, + count, + first_pos, + _t: PhantomData, + }) + } + + pub fn len(&self) -> usize { + self.count + } + + pub fn get(&mut self, index: usize) -> Result { + if index >= self.len() { + return Err("index out of range".into()); + } + + // read item data pos + let data_pos = if 0 == index { + self.first_pos + } else { + let tab_pos: u64 = (index * LEN_SIZE).try_into().str_err()?; + let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; + self.input.seek(SeekFrom::Start(tab_pos)).str_err()?; + self.input.read_exact(&mut pos_curr_data).str_err()?; + LSize::from_le_bytes(pos_curr_data) + }; + + // read next item pos + let mut pos_next_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; + self.input.read_exact(&mut pos_next_data).str_err()?; + let data_pos_next = LSize::from_le_bytes(pos_next_data); + // calc item data length + let data_len = data_pos_next - data_pos; + + // read & unpack item data + self.input + .seek(SeekFrom::Start(data_pos as u64)) + .str_err()?; + let reader = self.input.by_ref().take(data_len as u64); + let data = zstd::decode_all(reader).str_err()?; + + // decode item + let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?; + + Ok(item.0) + } +} diff --git a/src/main.rs b/src/main.rs index 323b6de..5ba9a27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,15 +9,14 @@ extern crate zip; use clap::{Parser, Subcommand}; use rand::seq::IteratorRandom; -use std::io::{self, Cursor, Read, Seek, Write}; +use std::io; use std::path::PathBuf; use std::time::Instant; use std::{fs, sync::mpsc, thread}; -use zstd::stream::raw::Operation; - use ledb::{Options, Storage}; +mod db; mod questions; mod source; @@ -25,6 +24,7 @@ use crate::questions::{Question, QuestionsConverter}; use crate::source::ReadSourceQuestionsBatches; const ZIP_FILENAME: &str = "json.zip"; +const NEW_DB_FILENAME: &str = "test.bin"; const DB_DIR: &str = "db"; #[derive(Subcommand, Debug)] @@ -235,67 +235,23 @@ fn main() { action(); } -type LSize = u32; -const LEN_SIZE: usize = std::mem::size_of::(); - fn read_from_db2(id: u32) -> Option { - const INPUT_FILENAME: &str = "test.bin"; - const INPUT_BUF_SIZE: usize = 4 * 1024; - - let cfg = bincode::config::standard(); - - let input = fs::File::open(INPUT_FILENAME).expect("open input"); - let mut input = io::BufReader::with_capacity(INPUT_BUF_SIZE, input); - - let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; - input.read_exact(&mut first_data).expect("read first"); - let first_pos = LSize::from_le_bytes(first_data); - let tab_len = (first_pos as usize) / LEN_SIZE; - let records_count = tab_len - 1; + let mut reader: db::Reader = + db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader"); let index = match id { 0 => { let mut rng = rand::thread_rng(); - (1..records_count).into_iter().choose(&mut rng).unwrap() + (1..reader.len()).into_iter().choose(&mut rng).unwrap() } _ => (id - 1) as usize, }; - if index >= records_count { + if index >= reader.len() { return None; } - let data_pos = if 0 == index { - first_pos - } else { - let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; - input - .seek_relative(((index - 1) * LEN_SIZE).try_into().expect("index to i64")) - .expect("seek to tab pos"); - input - .read_exact(&mut pos_curr_data) - .expect("read current pos"); - LSize::from_le_bytes(pos_curr_data) - }; - - let mut pos_next_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; - input.read_exact(&mut pos_next_data).expect("read next pos"); - - let data_pos_next = LSize::from_le_bytes(pos_next_data); - let data_len = data_pos_next - data_pos; - - let tab_pos = input.stream_position().expect("stream pos") as u32; - let advance_len = data_pos - tab_pos as LSize; - - input.seek_relative(advance_len as i64).expect("q seek"); - let reader = input.take(data_len as u64); - let data = zstd::decode_all(reader).expect("zstd decode data"); - - let question: (Question, usize) = - bincode::decode_from_slice(&data, cfg).expect("bincode decode q"); - let question = question.0; - - Some(question) + Some(reader.get(index).expect("get question")) } fn write_db2() { let (tx, rx) = mpsc::channel::(); @@ -308,68 +264,22 @@ fn write_db2() { println!("all done"); } fn db_writer2_task(rx: mpsc::Receiver) { - const COMP_DATA_LEVEL: i32 = 1; - const DATA_BUF_SIZE: usize = 500 * 1024 * 1024; - const OUT_BUF_SIZE: usize = 200 * 1024 * 1024; - const CURRENT_BUF_SIZE: usize = 100 * 1024; - const OUTPUT_FILENAME: &str = "test.bin"; - - let cfg = bincode::config::standard(); - let mut table: Vec = vec![]; - - let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder"); - - let data_buf: Vec = Vec::with_capacity(DATA_BUF_SIZE); - let cur_buf_raw: Vec = Vec::with_capacity(CURRENT_BUF_SIZE); - - let mut data_buf = Cursor::new(data_buf); - let cur_buf_raw = Cursor::new(cur_buf_raw); - let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder); + let writer_opts = db::WriterOpts::default(); + let mut writer: db::Writer = + db::Writer::new(NEW_DB_FILENAME, writer_opts).expect("new db writer"); let mut num = 1; - let mut pos: LSize = 0; rx.into_iter().for_each(|mut q| { q.num = num; - let q_data = bincode::encode_to_vec(q, cfg).expect("bincode q encode"); - let zencoder = cur_buf_z.operation_mut(); - zencoder.reinit().expect("zstd encoder reinit"); - zencoder - .set_pledged_src_size(q_data.len() as u64) - .expect("zstd op set len"); - - cur_buf_z.write_all(&q_data).expect("write question"); - cur_buf_z.finish().expect("zstd q finish"); - cur_buf_z.flush().expect("zbuf flush"); - - table.push(pos); - let cur_buf_raw = cur_buf_z.writer_mut(); - cur_buf_raw.set_position(0); - io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed"); - pos = data_buf.stream_position().expect("data buf pos") as LSize; + writer + .push(q) + .unwrap_or_else(|e| panic!("db writer push, num={num}, {e:#?}")); num += 1; }); - table.push(pos); - data_buf.set_position(0); - drop(cur_buf_z); - - let tab_data = vec![0u8; table.len() * LEN_SIZE]; - let mut tab_cursor = Cursor::new(tab_data); - let tab_size = (table.len() * LEN_SIZE) as LSize; - for pos in table { - let pos_data = (pos + tab_size).to_le_bytes(); - tab_cursor.write_all(&pos_data).expect("write pos"); - } - tab_cursor.set_position(0); - - let out = fs::File::create(OUTPUT_FILENAME).expect("out create"); - let mut out = io::BufWriter::with_capacity(OUT_BUF_SIZE, out); - io::copy(&mut tab_cursor, &mut out).expect("write tab"); - io::copy(&mut data_buf, &mut out).expect("copy z buf"); - drop(data_buf); - out.flush().expect("out flush"); + writer.finish().expect("db writer finish"); println!("write done"); }