add test write/read + fix write
This commit is contained in:
parent
f51f2be18f
commit
c28508b147
93
src/db.rs
93
src/db.rs
@ -1,10 +1,9 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fs,
|
fs,
|
||||||
io::{self, Cursor, Read, Seek, SeekFrom, Write},
|
io::{self, Cursor, Read, Seek, Write},
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
path::Path,
|
path::Path,
|
||||||
};
|
};
|
||||||
use zstd::stream::{raw::Operation, zio};
|
|
||||||
|
|
||||||
type LSize = u32;
|
type LSize = u32;
|
||||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
||||||
@ -38,44 +37,44 @@ impl Default for WriterOpts {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 500 * 1024 * 1024,
|
data_buf_size: 500 * 1024 * 1024,
|
||||||
out_buf_size: 200 * 1024 * 1024,
|
out_buf_size: 200 * 1024 * 1024,
|
||||||
current_buf_size: 20 * 1024,
|
current_buf_size: 20 * 1024 * 1024,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Writer<'a, T>
|
pub struct Writer<T>
|
||||||
where
|
where
|
||||||
T: bincode::Encode,
|
T: bincode::Encode,
|
||||||
{
|
{
|
||||||
out: io::BufWriter<fs::File>,
|
out: io::BufWriter<fs::File>,
|
||||||
data_buf: Cursor<Vec<u8>>,
|
data_buf: Cursor<Vec<u8>>,
|
||||||
cur_buf_z: zio::Writer<Cursor<Vec<u8>>, zstd::stream::raw::Encoder<'a>>,
|
|
||||||
table: Vec<LSize>,
|
table: Vec<LSize>,
|
||||||
|
compress_lvl: i32,
|
||||||
|
current_buf_size: usize,
|
||||||
_t: PhantomData<*const T>,
|
_t: PhantomData<*const T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Writer<'_, T>
|
impl<T> Writer<T>
|
||||||
where
|
where
|
||||||
T: bincode::Encode,
|
T: bincode::Encode + std::fmt::Debug,
|
||||||
{
|
{
|
||||||
pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
|
pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
|
||||||
let out = fs::File::create(path).str_err()?;
|
let out = fs::File::create(path).str_err()?;
|
||||||
let out = io::BufWriter::with_capacity(opts.out_buf_size, out);
|
let out = io::BufWriter::with_capacity(opts.out_buf_size, out);
|
||||||
let data_buf: Vec<u8> = Vec::with_capacity(opts.data_buf_size);
|
let data_buf: Vec<u8> = Vec::with_capacity(opts.data_buf_size);
|
||||||
let cur_buf_raw: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
|
|
||||||
let data_buf = Cursor::new(data_buf);
|
let data_buf = Cursor::new(data_buf);
|
||||||
let cur_buf_raw = Cursor::new(cur_buf_raw);
|
|
||||||
|
|
||||||
let zencoder = zstd::stream::raw::Encoder::new(opts.compress_lvl).str_err()?;
|
|
||||||
let cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder);
|
|
||||||
|
|
||||||
|
let compress_lvl = opts.compress_lvl;
|
||||||
|
let current_buf_size = opts.current_buf_size;
|
||||||
|
|
||||||
let table: Vec<LSize> = vec![];
|
let table: Vec<LSize> = vec![];
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
out,
|
out,
|
||||||
data_buf,
|
data_buf,
|
||||||
cur_buf_z,
|
|
||||||
table,
|
table,
|
||||||
|
compress_lvl,
|
||||||
|
current_buf_size,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -84,22 +83,26 @@ where
|
|||||||
let pos: LSize = self.data_buf.position() as LSize;
|
let pos: LSize = self.data_buf.position() as LSize;
|
||||||
|
|
||||||
let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?;
|
let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?;
|
||||||
|
|
||||||
let zencoder = self.cur_buf_z.operation_mut();
|
let cur_buf_raw: Vec<u8> = Vec::with_capacity(self.current_buf_size);
|
||||||
zencoder.reinit().str_err()?;
|
let cur_buf_raw = Cursor::new(cur_buf_raw);
|
||||||
|
let mut zencoder = zstd::stream::raw::Encoder::new(self.compress_lvl).str_err()?;
|
||||||
zencoder
|
zencoder
|
||||||
.set_pledged_src_size(item_data.len() as u64)
|
.set_pledged_src_size(item_data.len() as u64)
|
||||||
.str_err()?;
|
.str_err()?;
|
||||||
|
|
||||||
self.cur_buf_z.write_all(&item_data).str_err()?;
|
let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder);
|
||||||
self.cur_buf_z.finish().str_err()?;
|
cur_buf_z.write_all(&item_data).str_err()?;
|
||||||
self.cur_buf_z.flush().str_err()?;
|
cur_buf_z.finish().str_err()?;
|
||||||
|
cur_buf_z.flush().str_err()?;
|
||||||
|
|
||||||
self.table.push(pos);
|
self.table.push(pos);
|
||||||
let cur_buf_raw = self.cur_buf_z.writer_mut();
|
let (mut cur_buf_raw, _) = cur_buf_z.into_inner();
|
||||||
let size = cur_buf_raw.position();
|
let size = cur_buf_raw.position();
|
||||||
|
|
||||||
cur_buf_raw.set_position(0);
|
cur_buf_raw.set_position(0);
|
||||||
io::copy(&mut cur_buf_raw.take(size), &mut self.data_buf).str_err()?;
|
let mut chunk_reader = cur_buf_raw.take(size);
|
||||||
|
io::copy(&mut chunk_reader, &mut self.data_buf).str_err()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -186,7 +189,9 @@ where
|
|||||||
} else {
|
} else {
|
||||||
let tab_pos: u64 = (index * LEN_SIZE).try_into().str_err()?;
|
let tab_pos: u64 = (index * LEN_SIZE).try_into().str_err()?;
|
||||||
let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
|
let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
|
||||||
self.input.seek(SeekFrom::Start(tab_pos)).str_err()?;
|
let cur_pos = self.input.stream_position().str_err()? as i64;
|
||||||
|
self.input.seek_relative((tab_pos as i64) - cur_pos).str_err()?;
|
||||||
|
|
||||||
self.input.read_exact(&mut pos_curr_data).str_err()?;
|
self.input.read_exact(&mut pos_curr_data).str_err()?;
|
||||||
LSize::from_le_bytes(pos_curr_data)
|
LSize::from_le_bytes(pos_curr_data)
|
||||||
};
|
};
|
||||||
@ -199,8 +204,9 @@ where
|
|||||||
let data_len = data_pos_next - data_pos;
|
let data_len = data_pos_next - data_pos;
|
||||||
|
|
||||||
// read & unpack item data
|
// read & unpack item data
|
||||||
|
let cur_pos = self.input.stream_position().str_err()? as i64;
|
||||||
self.input
|
self.input
|
||||||
.seek(SeekFrom::Start(data_pos as u64))
|
.seek_relative((data_pos as i64) - cur_pos)
|
||||||
.str_err()?;
|
.str_err()?;
|
||||||
let reader = self.input.by_ref().take(data_len as u64);
|
let reader = self.input.by_ref().take(data_len as u64);
|
||||||
let data = zstd::decode_all(reader).str_err()?;
|
let data = zstd::decode_all(reader).str_err()?;
|
||||||
@ -211,3 +217,44 @@ where
|
|||||||
Ok(item.0)
|
Ok(item.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use rand::RngCore;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
struct TestData {
|
||||||
|
num: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn gen_data(count: usize) -> impl Iterator<Item = TestData> {
|
||||||
|
(0..count).into_iter().map(|i| TestData{ num: i as u64})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_write_read() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let tempfile = std::env::temp_dir().with_file_name(format!("test-{}.tmp", rng.next_u32()));
|
||||||
|
let opts = WriterOpts { compress_lvl: 1, data_buf_size: 10 * 1024 * 1024, out_buf_size: 10 * 1024 * 1024, current_buf_size: 4096 };
|
||||||
|
let mut writer: Writer<TestData> = Writer::new(&tempfile, opts).expect("new writer");
|
||||||
|
|
||||||
|
let items_iter = gen_data(5);
|
||||||
|
let items: Vec<TestData> = items_iter.collect();
|
||||||
|
|
||||||
|
writer.load(&mut items.clone().into_iter()).expect("load");
|
||||||
|
writer.finish().expect("finish write");
|
||||||
|
|
||||||
|
let mut reader: Reader<TestData> = Reader::new(&tempfile, 2048).expect("new reader");
|
||||||
|
assert_eq!(items.len(), reader.len());
|
||||||
|
|
||||||
|
for (idx,item) in items.iter().enumerate() {
|
||||||
|
let ritem = reader.get(idx).expect("get");
|
||||||
|
assert_eq!(*item, ritem);
|
||||||
|
}
|
||||||
|
|
||||||
|
let r = fs::remove_file(tempfile);
|
||||||
|
drop(r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user