sync db: postcard
This commit is contained in:
parent
1d4005abdb
commit
7e6b513179
@ -6,13 +6,13 @@ use std::{
|
|||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
|
||||||
use memmap::{Mmap, MmapOptions};
|
use memmap::{Mmap, MmapOptions};
|
||||||
|
|
||||||
type LSize = u32;
|
type LSize = u32;
|
||||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
||||||
const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard();
|
|
||||||
|
|
||||||
use crate::util::BincodeVecWriter;
|
|
||||||
use crate::util::ErrorToString;
|
use crate::util::ErrorToString;
|
||||||
|
|
||||||
pub struct WriterOpts {
|
pub struct WriterOpts {
|
||||||
@ -28,19 +28,19 @@ impl Default for WriterOpts {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 500 * 1024 * 1024,
|
data_buf_size: 500 * 1024 * 1024,
|
||||||
out_buf_size: 200 * 1024 * 1024,
|
out_buf_size: 200 * 1024 * 1024,
|
||||||
current_buf_size: 100 * 1024,
|
current_buf_size: 20 * 1024,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Writer<T>
|
pub struct Writer<T>
|
||||||
where
|
where
|
||||||
T: bincode::Encode,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
out: io::BufWriter<fs::File>,
|
out: io::BufWriter<fs::File>,
|
||||||
data_buf: Cursor<Vec<u8>>,
|
data_buf: Cursor<Vec<u8>>,
|
||||||
cur_buf_raw: Cursor<Vec<u8>>,
|
cur_buf_raw: Cursor<Vec<u8>>,
|
||||||
cur_buf_item: BincodeVecWriter,
|
cur_buf_item: Vec<u8>,
|
||||||
table: Vec<LSize>,
|
table: Vec<LSize>,
|
||||||
compress_lvl: i32,
|
compress_lvl: i32,
|
||||||
_t: PhantomData<Arc<T>>,
|
_t: PhantomData<Arc<T>>,
|
||||||
@ -48,7 +48,7 @@ where
|
|||||||
|
|
||||||
impl<T> Writer<T>
|
impl<T> Writer<T>
|
||||||
where
|
where
|
||||||
T: bincode::Encode,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
|
pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
|
||||||
let out = fs::File::create(path).str_err()?;
|
let out = fs::File::create(path).str_err()?;
|
||||||
@ -58,8 +58,7 @@ where
|
|||||||
|
|
||||||
let cur_buf_raw: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
|
let cur_buf_raw: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
|
||||||
let cur_buf_raw = Cursor::new(cur_buf_raw);
|
let cur_buf_raw = Cursor::new(cur_buf_raw);
|
||||||
let cur_buf_item: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
|
let cur_buf_item: Vec<u8> = vec![0; opts.current_buf_size];
|
||||||
let cur_buf_item = BincodeVecWriter::new(cur_buf_item);
|
|
||||||
|
|
||||||
let compress_lvl = opts.compress_lvl;
|
let compress_lvl = opts.compress_lvl;
|
||||||
|
|
||||||
@ -83,19 +82,20 @@ where
|
|||||||
pub fn push_by_ref(&mut self, item: &T) -> Result<(), String> {
|
pub fn push_by_ref(&mut self, item: &T) -> Result<(), String> {
|
||||||
let pos: LSize = self.data_buf.position() as LSize;
|
let pos: LSize = self.data_buf.position() as LSize;
|
||||||
|
|
||||||
bincode::encode_into_writer(item, &mut self.cur_buf_item, BINCODE_CFG).str_err()?;
|
let cur_item_data = postcard::to_slice(item, self.cur_buf_item.as_mut_slice()).str_err()?;
|
||||||
|
|
||||||
let mut zencoder = zstd::stream::raw::Encoder::new(self.compress_lvl).str_err()?;
|
let mut zencoder = zstd::stream::raw::Encoder::new(self.compress_lvl).str_err()?;
|
||||||
zencoder
|
zencoder
|
||||||
.set_pledged_src_size(Some(self.cur_buf_item.len() as u64))
|
.set_pledged_src_size(Some(cur_item_data.len() as u64))
|
||||||
.str_err()?;
|
.str_err()?;
|
||||||
|
|
||||||
self.cur_buf_raw.set_position(0);
|
self.cur_buf_raw.set_position(0);
|
||||||
let mut cur_buf_z = zstd::stream::zio::Writer::new(&mut self.cur_buf_raw, zencoder);
|
let mut cur_buf_z = zstd::stream::zio::Writer::new(&mut self.cur_buf_raw, zencoder);
|
||||||
cur_buf_z.write_all(&self.cur_buf_item).str_err()?;
|
cur_buf_z.write_all(cur_item_data).str_err()?;
|
||||||
cur_buf_z.finish().str_err()?;
|
cur_buf_z.finish().str_err()?;
|
||||||
cur_buf_z.flush().str_err()?;
|
cur_buf_z.flush().str_err()?;
|
||||||
self.cur_buf_item.clear();
|
|
||||||
|
cur_item_data.fill(0);
|
||||||
|
|
||||||
self.table.push(pos);
|
self.table.push(pos);
|
||||||
let (cur_buf_raw, _) = cur_buf_z.into_inner();
|
let (cur_buf_raw, _) = cur_buf_z.into_inner();
|
||||||
@ -150,7 +150,7 @@ where
|
|||||||
|
|
||||||
pub struct Reader<T>
|
pub struct Reader<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
mmap: Mmap,
|
mmap: Mmap,
|
||||||
count: usize,
|
count: usize,
|
||||||
@ -160,7 +160,7 @@ where
|
|||||||
|
|
||||||
impl<T> Reader<T>
|
impl<T> Reader<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
pub fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> {
|
pub fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> {
|
||||||
let file = fs::File::open(path).str_err()?;
|
let file = fs::File::open(path).str_err()?;
|
||||||
@ -215,9 +215,9 @@ where
|
|||||||
let data = zstd::decode_all(reader).str_err()?;
|
let data = zstd::decode_all(reader).str_err()?;
|
||||||
|
|
||||||
// decode item
|
// decode item
|
||||||
let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?;
|
let (item, _): (T, _) = postcard::take_from_bytes(&data).str_err()?;
|
||||||
|
|
||||||
Ok(item.0)
|
Ok(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter(&self) -> ReaderIter<'_, T> {
|
pub fn iter(&self) -> ReaderIter<'_, T> {
|
||||||
@ -227,7 +227,7 @@ where
|
|||||||
|
|
||||||
pub struct ReaderIter<'a, T>
|
pub struct ReaderIter<'a, T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
reader: &'a Reader<T>,
|
reader: &'a Reader<T>,
|
||||||
index: Option<usize>,
|
index: Option<usize>,
|
||||||
@ -235,7 +235,7 @@ where
|
|||||||
|
|
||||||
impl<'a, T> ReaderIter<'a, T>
|
impl<'a, T> ReaderIter<'a, T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
fn new(reader: &'a Reader<T>) -> Self {
|
fn new(reader: &'a Reader<T>) -> Self {
|
||||||
ReaderIter {
|
ReaderIter {
|
||||||
@ -247,7 +247,7 @@ where
|
|||||||
|
|
||||||
impl<'a, T> Iterator for ReaderIter<'a, T>
|
impl<'a, T> Iterator for ReaderIter<'a, T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
type Item = T;
|
type Item = T;
|
||||||
|
|
||||||
@ -300,7 +300,7 @@ where
|
|||||||
|
|
||||||
impl<'a, T> ExactSizeIterator for ReaderIter<'a, T>
|
impl<'a, T> ExactSizeIterator for ReaderIter<'a, T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.reader.len()
|
self.reader.len()
|
||||||
@ -309,7 +309,7 @@ where
|
|||||||
|
|
||||||
pub struct ReaderIntoIter<T>
|
pub struct ReaderIntoIter<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
reader: Reader<T>,
|
reader: Reader<T>,
|
||||||
index: Option<usize>,
|
index: Option<usize>,
|
||||||
@ -317,7 +317,7 @@ where
|
|||||||
|
|
||||||
impl<T> ReaderIntoIter<T>
|
impl<T> ReaderIntoIter<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
fn new(reader: Reader<T>) -> Self {
|
fn new(reader: Reader<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -329,7 +329,7 @@ where
|
|||||||
|
|
||||||
impl<T> Iterator for ReaderIntoIter<T>
|
impl<T> Iterator for ReaderIntoIter<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
type Item = T;
|
type Item = T;
|
||||||
|
|
||||||
@ -382,7 +382,7 @@ where
|
|||||||
|
|
||||||
impl<T> ExactSizeIterator for ReaderIntoIter<T>
|
impl<T> ExactSizeIterator for ReaderIntoIter<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.reader.len()
|
self.reader.len()
|
||||||
@ -391,7 +391,7 @@ where
|
|||||||
|
|
||||||
impl<T> IntoIterator for Reader<T>
|
impl<T> IntoIterator for Reader<T>
|
||||||
where
|
where
|
||||||
T: bincode::Decode,
|
T: DeserializeOwned,
|
||||||
{
|
{
|
||||||
type Item = T;
|
type Item = T;
|
||||||
type IntoIter = ReaderIntoIter<Self::Item>;
|
type IntoIter = ReaderIntoIter<Self::Item>;
|
||||||
@ -404,9 +404,10 @@ where
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use serde::Deserialize;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
struct TestData {
|
struct TestData {
|
||||||
num: u64,
|
num: u64,
|
||||||
test: String,
|
test: String,
|
||||||
@ -427,7 +428,7 @@ mod test {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
current_buf_size: 4096,
|
current_buf_size: 20 * 1024,
|
||||||
};
|
};
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
||||||
|
|
||||||
@ -454,7 +455,7 @@ mod test {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
current_buf_size: 4096,
|
current_buf_size: 20 * 1024,
|
||||||
};
|
};
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
||||||
|
|
||||||
@ -480,7 +481,7 @@ mod test {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
current_buf_size: 4096,
|
current_buf_size: 20 * 1024,
|
||||||
};
|
};
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
||||||
|
|
||||||
@ -509,7 +510,7 @@ mod test {
|
|||||||
compress_lvl: 1,
|
compress_lvl: 1,
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
current_buf_size: 4096,
|
current_buf_size: 20 * 1024,
|
||||||
};
|
};
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user