From e074108560957cc746f309ed1d02a2724ffacd58 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 14 Aug 2023 15:53:17 +0300 Subject: [PATCH] use BincodeVecWriter in async_db for reusing current item buf --- lib/src/async_db.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs index ca9c09b..154cf27 100644 --- a/lib/src/async_db.rs +++ b/lib/src/async_db.rs @@ -24,12 +24,14 @@ type LSize = u32; const LEN_SIZE: usize = std::mem::size_of::(); const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard(); +use crate::util::BincodeVecWriter; use crate::util::ErrorToString; pub struct WriterOpts { pub compress_lvl: Level, pub data_buf_size: usize, pub out_buf_size: usize, + pub current_buf_size: usize, } impl Default for WriterOpts { @@ -38,6 +40,7 @@ impl Default for WriterOpts { compress_lvl: Level::Default, data_buf_size: 500 * 1024 * 1024, out_buf_size: 200 * 1024 * 1024, + current_buf_size: 100 * 1024, } } } @@ -48,6 +51,7 @@ where { out: io::BufWriter, data_buf: Vec, + cur_buf_item: BincodeVecWriter, table: Vec, compress_lvl: Level, _t: PhantomData>, @@ -61,6 +65,8 @@ where let out = fs::File::create(path).await.str_err()?; let out = io::BufWriter::with_capacity(opts.out_buf_size, out); let data_buf: Vec = Vec::with_capacity(opts.data_buf_size); + let cur_buf_item: Vec = Vec::with_capacity(opts.current_buf_size); + let cur_buf_item = BincodeVecWriter::new(cur_buf_item); let compress_lvl = opts.compress_lvl; @@ -69,6 +75,7 @@ where Ok(Self { out, data_buf, + cur_buf_item, table, compress_lvl, _t: PhantomData, @@ -78,12 +85,13 @@ where pub async fn push(&mut self, item: T) -> Result<(), String> { let pos: LSize = self.data_buf.len() as LSize; - let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?; + bincode::encode_into_writer(item, &mut self.cur_buf_item, BINCODE_CFG).str_err()?; - let mut zencoder = ZstdEncoder::with_quality(&item_data[..], self.compress_lvl); + let mut zencoder = ZstdEncoder::with_quality(&self.cur_buf_item[..], self.compress_lvl); io::copy(&mut zencoder, &mut self.data_buf) .await .str_err()?; + self.cur_buf_item.clear(); self.table.push(pos); @@ -387,9 +395,9 @@ mod test { let dir = tempdir().expect("tempdir"); let tmpfile = dir.path().join("test.tmp"); let opts = WriterOpts { - compress_lvl: Level::Default, data_buf_size: 10 * 1024 * 1024, out_buf_size: 10 * 1024 * 1024, + ..Default::default() }; let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); @@ -407,9 +415,9 @@ mod test { let dir = tempdir().expect("tempdir"); let tmpfile = dir.path().join("test.tmp"); let opts = WriterOpts { - compress_lvl: Level::Default, data_buf_size: 10 * 1024 * 1024, out_buf_size: 10 * 1024 * 1024, + ..Default::default() }; let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); @@ -435,9 +443,9 @@ mod test { let dir = tempdir().expect("tempdir"); let tmpfile = dir.path().join("test.tmp"); let opts = WriterOpts { - compress_lvl: Level::Default, data_buf_size: 10 * 1024 * 1024, out_buf_size: 10 * 1024 * 1024, + ..Default::default() }; let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); @@ -472,9 +480,9 @@ mod test { let dir = tempdir().expect("tempdir"); let tmpfile = dir.path().join("test.tmp"); let opts = WriterOpts { - compress_lvl: Level::Default, data_buf_size: 10 * 1024 * 1024, out_buf_size: 10 * 1024 * 1024, + ..Default::default() }; let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer");