add async feature #1
@ -1,4 +1,5 @@
|
|||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::vec;
|
||||||
use std::{path::Path, sync::Arc};
|
use std::{path::Path, sync::Arc};
|
||||||
|
|
||||||
use async_compression::tokio::bufread::ZstdDecoder;
|
use async_compression::tokio::bufread::ZstdDecoder;
|
||||||
@ -253,7 +254,8 @@ where
|
|||||||
0 == self.len()
|
0 == self.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(&self, index: usize) -> Result<T, String> {
|
/// get item at index, reuse data buffer
|
||||||
|
pub async fn get_with_buf(&self, index: usize, data_buf: &mut Vec<u8>) -> Result<T, String> {
|
||||||
if index >= self.len() {
|
if index >= self.len() {
|
||||||
return Err("index out of range".into());
|
return Err("index out of range".into());
|
||||||
}
|
}
|
||||||
@ -286,15 +288,21 @@ where
|
|||||||
|
|
||||||
// read & unpack item data
|
// read & unpack item data
|
||||||
let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?);
|
let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?);
|
||||||
let mut data = Vec::<u8>::new();
|
decoder.read_to_end(data_buf).await.str_err()?;
|
||||||
decoder.read_to_end(&mut data).await.str_err()?;
|
|
||||||
|
|
||||||
// decode item
|
// decode item
|
||||||
let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?;
|
let item: (T, usize) = bincode::decode_from_slice(data_buf, BINCODE_CFG).str_err()?;
|
||||||
|
|
||||||
|
data_buf.clear();
|
||||||
Ok(item.0)
|
Ok(item.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// get item at index
|
||||||
|
pub async fn get(&self, index: usize) -> Result<T, String> {
|
||||||
|
let mut data_buf: Vec<u8> = vec![];
|
||||||
|
self.get_with_buf(index, &mut data_buf).await
|
||||||
|
}
|
||||||
|
|
||||||
pub fn stream(&self) -> ReaderStream<'_, T> {
|
pub fn stream(&self) -> ReaderStream<'_, T> {
|
||||||
ReaderStream::new(self)
|
ReaderStream::new(self)
|
||||||
}
|
}
|
||||||
@ -438,6 +446,35 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_write_read_get_with_buf() {
|
||||||
|
let dir = tempdir().expect("tempdir");
|
||||||
|
let tmpfile = dir.path().join("test.tmp");
|
||||||
|
let opts = WriterOpts {
|
||||||
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
||||||
|
|
||||||
|
let items_iter = gen_data(5);
|
||||||
|
let items: Vec<TestData> = items_iter.collect();
|
||||||
|
|
||||||
|
let src = futures::stream::iter(items.clone());
|
||||||
|
pin_mut!(src);
|
||||||
|
writer.load(src).await.expect("load");
|
||||||
|
writer.finish().await.expect("finish write");
|
||||||
|
|
||||||
|
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
|
||||||
|
assert_eq!(items.len(), reader.len());
|
||||||
|
|
||||||
|
for (idx, item) in items.iter().enumerate() {
|
||||||
|
let mut data_buf: Vec<u8> = vec![];
|
||||||
|
let ritem = reader.get_with_buf(idx, &mut data_buf).await.expect("get");
|
||||||
|
assert_eq!(*item, ritem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_write_read_stream() {
|
async fn test_write_read_stream() {
|
||||||
let dir = tempdir().expect("tempdir");
|
let dir = tempdir().expect("tempdir");
|
||||||
|
Loading…
Reference in New Issue
Block a user