diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs index 154cf27..3cb652e 100644 --- a/lib/src/async_db.rs +++ b/lib/src/async_db.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::vec; use std::{path::Path, sync::Arc}; use async_compression::tokio::bufread::ZstdDecoder; @@ -253,7 +254,8 @@ where 0 == self.len() } - pub async fn get(&self, index: usize) -> Result { + /// get item at index, reuse data buffer + pub async fn get_with_buf(&self, index: usize, data_buf: &mut Vec) -> Result { if index >= self.len() { return Err("index out of range".into()); } @@ -286,15 +288,21 @@ where // read & unpack item data let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?); - let mut data = Vec::::new(); - decoder.read_to_end(&mut data).await.str_err()?; + decoder.read_to_end(data_buf).await.str_err()?; // decode item - let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?; + let item: (T, usize) = bincode::decode_from_slice(data_buf, BINCODE_CFG).str_err()?; + data_buf.clear(); Ok(item.0) } + /// get item at index + pub async fn get(&self, index: usize) -> Result { + let mut data_buf: Vec = vec![]; + self.get_with_buf(index, &mut data_buf).await + } + pub fn stream(&self) -> ReaderStream<'_, T> { ReaderStream::new(self) } @@ -438,6 +446,35 @@ mod test { } } + #[tokio::test] + async fn test_write_read_get_with_buf() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile, 2048).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let mut data_buf: Vec = vec![]; + let ritem = reader.get_with_buf(idx, &mut data_buf).await.expect("get"); + assert_eq!(*item, ritem); + } + } + #[tokio::test] async fn test_write_read_stream() { let dir = tempdir().expect("tempdir");