async_db: add BufReader, BufReaderStream

for using as mutable reader
This commit is contained in:
Dmitry Belyaev 2023-08-16 11:44:03 +03:00
parent ae96fb3bf8
commit feb2303db9
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -1,4 +1,5 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::Deref;
use std::vec; use std::vec;
use std::{path::Path, sync::Arc}; use std::{path::Path, sync::Arc};
@ -224,7 +225,7 @@ impl<T> Reader<T>
where where
T: bincode::Decode, T: bincode::Decode,
{ {
pub async fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> { pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, String> {
let mmap = AsyncOptions::new() let mmap = AsyncOptions::new()
.read(true) .read(true)
.open_mmap_file(path) .open_mmap_file(path)
@ -372,6 +373,145 @@ where
} }
} }
pub struct BufReader<T>
where
T: bincode::Decode,
{
inner: Reader<T>,
buf: Vec<u8>,
}
impl<T> BufReader<T>
where
T: bincode::Decode,
{
pub async fn new<P: AsRef<Path>>(path: P, buf_size: usize) -> Result<Self, String> {
match Reader::<T>::new(path).await {
Ok(inner) => Ok(Self {
inner,
buf: Vec::with_capacity(buf_size),
}),
Err(e) => Err(e),
}
}
pub async fn get(&mut self, index: usize) -> Result<T, String> {
self.inner.get_with_buf(index, &mut self.buf).await
}
pub fn into_inner(self) -> Reader<T> {
self.inner
}
pub fn stream(self) -> BufReaderStream<T> {
BufReaderStream::new(self)
}
}
impl<T> From<Reader<T>> for BufReader<T>
where
T: bincode::Decode,
{
fn from(inner: Reader<T>) -> Self {
Self {
inner,
buf: Vec::new(),
}
}
}
impl<T> From<BufReader<T>> for Reader<T>
where
T: bincode::Decode,
{
fn from(value: BufReader<T>) -> Self {
value.into_inner()
}
}
impl<T> Deref for BufReader<T>
where
T: bincode::Decode,
{
type Target = Reader<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct BufReaderStream<T>
where
T: bincode::Decode,
{
reader: BufReader<T>,
index: Option<usize>,
}
impl<T> BufReaderStream<T>
where
T: bincode::Decode,
{
fn new(reader: BufReader<T>) -> Self {
BufReaderStream {
reader,
index: None,
}
}
async fn get_next(&mut self) -> Result<T, String> {
match self.index {
None => Err("index is None".into()),
Some(index) => {
let res = self.reader.get(index).await;
self.index = Some(index + 1);
res
}
}
}
}
impl<T> Stream for BufReaderStream<T>
where
T: bincode::Decode,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
if self.index.is_none() && !self.reader.is_empty() {
self.index = Some(0);
}
if self.index.unwrap() == self.reader.len() {
return Poll::Ready(None);
}
// FIXME: mayby work only if reader.get().poll() return Ready immediately
let future = self.get_next();
pin_mut!(future);
match Pin::new(&mut future).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.reader.len();
if self.index.is_none() {
return (len, Some(len));
}
let index = self.index.unwrap();
let rem = if len > index + 1 {
len - (index + 1)
} else {
0
};
(rem, Some(rem))
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -438,7 +578,7 @@ mod test {
writer.load(src).await.expect("load"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
@ -466,7 +606,7 @@ mod test {
src.forward(writer.sink()).await.expect("forward"); src.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
@ -494,7 +634,7 @@ mod test {
writer.load(src).await.expect("load"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
@ -523,7 +663,7 @@ mod test {
writer.load(src).await.expect("load"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
let dst_stream = reader.stream(); let dst_stream = reader.stream();
@ -560,7 +700,7 @@ mod test {
writer.load(src).await.expect("load"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
let reader = Arc::new(reader); let reader = Arc::new(reader);