add async reader
This commit is contained in:
		@@ -1,21 +1,18 @@
 | 
			
		||||
use std::{marker::PhantomData, path::Path, sync::Arc};
 | 
			
		||||
 | 
			
		||||
use async_stream::stream;
 | 
			
		||||
use tokio::pin;
 | 
			
		||||
 | 
			
		||||
use async_compression::tokio::bufread::ZstdDecoder;
 | 
			
		||||
use async_compression::tokio::write::ZstdEncoder;
 | 
			
		||||
use async_compression::Level;
 | 
			
		||||
use futures::stream::{self, StreamExt};
 | 
			
		||||
use futures::stream::StreamExt;
 | 
			
		||||
use futures_core::stream::Stream;
 | 
			
		||||
use futures_util::pin_mut;
 | 
			
		||||
 | 
			
		||||
use tokio::{
 | 
			
		||||
    fs,
 | 
			
		||||
    io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
 | 
			
		||||
    io::{self, AsyncReadExt, AsyncWriteExt},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use fmmap::tokio::{AsyncMmapFile, AsyncOptions};
 | 
			
		||||
use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt, AsyncOptions};
 | 
			
		||||
 | 
			
		||||
type LSize = u32;
 | 
			
		||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
 | 
			
		||||
@@ -137,9 +134,97 @@ where
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct Reader<T>
 | 
			
		||||
where
 | 
			
		||||
    T: bincode::Decode,
 | 
			
		||||
{
 | 
			
		||||
    mmap: AsyncMmapFile,
 | 
			
		||||
    count: usize,
 | 
			
		||||
    first_pos: LSize,
 | 
			
		||||
    _t: Option<Arc<T>>, // PhantomData replacement
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T> Reader<T>
 | 
			
		||||
where
 | 
			
		||||
    T: bincode::Decode,
 | 
			
		||||
{
 | 
			
		||||
    pub async fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> {
 | 
			
		||||
        let mmap = AsyncOptions::new()
 | 
			
		||||
            .read(true)
 | 
			
		||||
            .open_mmap_file(path)
 | 
			
		||||
            .await
 | 
			
		||||
            .str_err()?;
 | 
			
		||||
        mmap.try_lock_shared().str_err()?;
 | 
			
		||||
 | 
			
		||||
        // read first pos and records count
 | 
			
		||||
        let first_data: [u8; LEN_SIZE] = mmap.bytes(0, LEN_SIZE).str_err()?.try_into().str_err()?;
 | 
			
		||||
        let first_pos = LSize::from_le_bytes(first_data);
 | 
			
		||||
        let tab_len = (first_pos as usize) / LEN_SIZE;
 | 
			
		||||
        let count = tab_len - 1;
 | 
			
		||||
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            mmap,
 | 
			
		||||
            count,
 | 
			
		||||
            first_pos,
 | 
			
		||||
            _t: None,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn len(&self) -> usize {
 | 
			
		||||
        self.count
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn is_empty(&self) -> bool {
 | 
			
		||||
        0 == self.len()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get(&self, index: usize) -> Result<T, String> {
 | 
			
		||||
        if index >= self.len() {
 | 
			
		||||
            return Err("index out of range".into());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let next_pos: usize = (index + 1) * LEN_SIZE;
 | 
			
		||||
 | 
			
		||||
        // read item data pos
 | 
			
		||||
        let data_pos = if 0 == index {
 | 
			
		||||
            self.first_pos
 | 
			
		||||
        } else {
 | 
			
		||||
            let tab_pos: usize = index * LEN_SIZE;
 | 
			
		||||
            let pos_curr_data: [u8; LEN_SIZE] = self
 | 
			
		||||
                .mmap
 | 
			
		||||
                .bytes(tab_pos, LEN_SIZE)
 | 
			
		||||
                .str_err()?
 | 
			
		||||
                .try_into()
 | 
			
		||||
                .str_err()?;
 | 
			
		||||
            LSize::from_le_bytes(pos_curr_data)
 | 
			
		||||
        } as usize;
 | 
			
		||||
 | 
			
		||||
        // read next item pos
 | 
			
		||||
        let pos_next_data: [u8; LEN_SIZE] = self
 | 
			
		||||
            .mmap
 | 
			
		||||
            .bytes(next_pos, LEN_SIZE)
 | 
			
		||||
            .str_err()?
 | 
			
		||||
            .try_into()
 | 
			
		||||
            .str_err()?;
 | 
			
		||||
        let data_pos_next = LSize::from_le_bytes(pos_next_data) as usize;
 | 
			
		||||
        let data_len = data_pos_next - data_pos;
 | 
			
		||||
 | 
			
		||||
        // read & unpack item data
 | 
			
		||||
        let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?);
 | 
			
		||||
        let mut data = Vec::<u8>::new();
 | 
			
		||||
        decoder.read_to_end(&mut data).await.str_err()?;
 | 
			
		||||
 | 
			
		||||
        // decode item
 | 
			
		||||
        let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?;
 | 
			
		||||
 | 
			
		||||
        Ok(item.0)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod test {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use async_stream::stream;
 | 
			
		||||
    use tempfile::tempdir;
 | 
			
		||||
 | 
			
		||||
    #[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 | 
			
		||||
@@ -185,4 +270,32 @@ mod test {
 | 
			
		||||
        writer.load(src).await.expect("load");
 | 
			
		||||
        writer.finish().await.expect("finish write");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[tokio::test]
 | 
			
		||||
    async fn test_write_read() {
 | 
			
		||||
        let dir = tempdir().expect("tempdir");
 | 
			
		||||
        let tmpfile = dir.path().join("test.tmp");
 | 
			
		||||
        let opts = WriterOpts {
 | 
			
		||||
            compress_lvl: Level::Default,
 | 
			
		||||
            data_buf_size: 10 * 1024 * 1024,
 | 
			
		||||
            out_buf_size: 10 * 1024 * 1024,
 | 
			
		||||
        };
 | 
			
		||||
        let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
 | 
			
		||||
 | 
			
		||||
        let items_iter = gen_data(5);
 | 
			
		||||
        let items: Vec<TestData> = items_iter.collect();
 | 
			
		||||
 | 
			
		||||
        let src = stream_iter(items.clone().into_iter());
 | 
			
		||||
        pin_mut!(src);
 | 
			
		||||
        writer.load(src).await.expect("load");
 | 
			
		||||
        writer.finish().await.expect("finish write");
 | 
			
		||||
 | 
			
		||||
        let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
 | 
			
		||||
        assert_eq!(items.len(), reader.len());
 | 
			
		||||
 | 
			
		||||
        for (idx, item) in items.iter().enumerate() {
 | 
			
		||||
            let ritem = reader.get(idx).await.expect("get");
 | 
			
		||||
            assert_eq!(*item, ritem);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user