diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs index 3883160..17ee043 100644 --- a/lib/src/async_db.rs +++ b/lib/src/async_db.rs @@ -1,21 +1,18 @@ use std::{marker::PhantomData, path::Path, sync::Arc}; -use async_stream::stream; -use tokio::pin; - use async_compression::tokio::bufread::ZstdDecoder; use async_compression::tokio::write::ZstdEncoder; use async_compression::Level; -use futures::stream::{self, StreamExt}; +use futures::stream::StreamExt; use futures_core::stream::Stream; use futures_util::pin_mut; use tokio::{ fs, - io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + io::{self, AsyncReadExt, AsyncWriteExt}, }; -use fmmap::tokio::{AsyncMmapFile, AsyncOptions}; +use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt, AsyncOptions}; type LSize = u32; const LEN_SIZE: usize = std::mem::size_of::(); @@ -137,9 +134,97 @@ where } } +pub struct Reader +where + T: bincode::Decode, +{ + mmap: AsyncMmapFile, + count: usize, + first_pos: LSize, + _t: Option>, // PhantomData replacement +} + +impl Reader +where + T: bincode::Decode, +{ + pub async fn new>(path: P, _buf_size: usize) -> Result { + let mmap = AsyncOptions::new() + .read(true) + .open_mmap_file(path) + .await + .str_err()?; + mmap.try_lock_shared().str_err()?; + + // read first pos and records count + let first_data: [u8; LEN_SIZE] = mmap.bytes(0, LEN_SIZE).str_err()?.try_into().str_err()?; + let first_pos = LSize::from_le_bytes(first_data); + let tab_len = (first_pos as usize) / LEN_SIZE; + let count = tab_len - 1; + + Ok(Self { + mmap, + count, + first_pos, + _t: None, + }) + } + + pub fn len(&self) -> usize { + self.count + } + + pub fn is_empty(&self) -> bool { + 0 == self.len() + } + + pub async fn get(&self, index: usize) -> Result { + if index >= self.len() { + return Err("index out of range".into()); + } + + let next_pos: usize = (index + 1) * LEN_SIZE; + + // read item data pos + let data_pos = if 0 == index { + self.first_pos + } else { + let tab_pos: usize = index * LEN_SIZE; + let pos_curr_data: [u8; LEN_SIZE] = self + .mmap + .bytes(tab_pos, LEN_SIZE) + .str_err()? + .try_into() + .str_err()?; + LSize::from_le_bytes(pos_curr_data) + } as usize; + + // read next item pos + let pos_next_data: [u8; LEN_SIZE] = self + .mmap + .bytes(next_pos, LEN_SIZE) + .str_err()? + .try_into() + .str_err()?; + let data_pos_next = LSize::from_le_bytes(pos_next_data) as usize; + let data_len = data_pos_next - data_pos; + + // read & unpack item data + let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?); + let mut data = Vec::::new(); + decoder.read_to_end(&mut data).await.str_err()?; + + // decode item + let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?; + + Ok(item.0) + } +} + #[cfg(test)] mod test { use super::*; + use async_stream::stream; use tempfile::tempdir; #[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -185,4 +270,32 @@ mod test { writer.load(src).await.expect("load"); writer.finish().await.expect("finish write"); } + + #[tokio::test] + async fn test_write_read() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + compress_lvl: Level::Default, + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = stream_iter(items.clone().into_iter()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile, 2048).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let ritem = reader.get(idx).await.expect("get"); + assert_eq!(*item, ritem); + } + } }