chgk_ledb/lib/src/async_db.rs

788 lines
22 KiB
Rust

use std::marker::PhantomData;
use std::ops::Deref;
use std::vec;
use std::{path::Path, sync::Arc};
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::tokio::bufread::ZstdEncoder;
use async_compression::Level;
use futures::sink::Sink;
use futures::stream::StreamExt;
use futures_core::stream::Stream;
use futures_core::Future;
use futures_util::pin_mut;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::{
fs,
io::{self, AsyncReadExt, AsyncWriteExt},
};
use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt, AsyncOptions};
type LSize = u32;
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard();
use crate::util::BincodeVecWriter;
use crate::util::ErrorToString;
pub struct WriterOpts {
pub compress_lvl: Level,
pub data_buf_size: usize,
pub out_buf_size: usize,
pub current_buf_size: usize,
}
impl Default for WriterOpts {
fn default() -> Self {
Self {
compress_lvl: Level::Default,
data_buf_size: 500 * 1024 * 1024,
out_buf_size: 200 * 1024 * 1024,
current_buf_size: 100 * 1024,
}
}
}
pub struct Writer<T>
where
T: bincode::Encode,
{
out: io::BufWriter<fs::File>,
data_buf: Vec<u8>,
cur_buf_item: BincodeVecWriter,
table: Vec<LSize>,
compress_lvl: Level,
_t: PhantomData<Arc<T>>,
}
impl<T> Writer<T>
where
T: bincode::Encode,
{
pub async fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
let out = fs::File::create(path).await.str_err()?;
let out = io::BufWriter::with_capacity(opts.out_buf_size, out);
let data_buf: Vec<u8> = Vec::with_capacity(opts.data_buf_size);
let cur_buf_item: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
let cur_buf_item = BincodeVecWriter::new(cur_buf_item);
let compress_lvl = opts.compress_lvl;
let table: Vec<LSize> = vec![];
Ok(Self {
out,
data_buf,
cur_buf_item,
table,
compress_lvl,
_t: PhantomData,
})
}
pub async fn push(&mut self, item: T) -> Result<(), String> {
self.push_by_ref(&item).await
}
pub async fn push_by_ref(&mut self, item: &T) -> Result<(), String> {
let pos: LSize = self.data_buf.len() as LSize;
bincode::encode_into_writer(item, &mut self.cur_buf_item, BINCODE_CFG).str_err()?;
let mut zencoder = ZstdEncoder::with_quality(&self.cur_buf_item[..], self.compress_lvl);
io::copy(&mut zencoder, &mut self.data_buf)
.await
.str_err()?;
self.cur_buf_item.clear();
self.table.push(pos);
// FIXME
// this will break WriterSink::poll_ready (will wait forever), but not Writer::load
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
}
pub async fn load<S>(&mut self, source: S) -> Result<(), String>
where
S: Stream<Item = T> + std::marker::Unpin,
{
let hint = source.size_hint();
let hint = std::cmp::max(hint.0, hint.1.unwrap_or(0));
if hint > 0 {
self.table.reserve(hint);
}
pin_mut!(source);
while let Some(item) = source.next().await {
self.push(item).await?;
}
Ok(())
}
pub async fn finish(mut self) -> Result<(), String> {
// finish tab
let pos: LSize = self.data_buf.len() as LSize;
self.table.push(pos);
// write tab
let tab_size = (self.table.len() * LEN_SIZE) as LSize;
for pos in self.table {
let pos_data = (pos + tab_size).to_le_bytes();
self.out.write_all(&pos_data).await.str_err()?;
}
// copy data
self.out.write_all(&self.data_buf[..]).await.str_err()?;
self.out.flush().await.str_err()?;
Ok(())
}
pub fn sink(&mut self) -> WriterSink<'_, T> {
WriterSink {
writer: self,
item: None,
}
}
}
use pin_project::pin_project;
#[pin_project]
/// FIXME: not really async
/// only work when ..push.poll() returns Ready immediately
pub struct WriterSink<'a, T>
where
T: bincode::Encode,
{
#[pin]
writer: &'a mut Writer<T>,
item: Option<T>,
}
impl<'a, T> Sink<T> for WriterSink<'a, T>
where
T: bincode::Encode,
{
type Error = String;
fn poll_ready(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), String>> {
let mut this = self.project();
if this.item.is_none() {
return Poll::Ready(Ok(()));
}
let item = this.item.take().unwrap();
let push_fut = this.writer.push(item); // FIXME:: how to save this future???
pin_mut!(push_fut);
push_fut.poll(ctx)
}
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let this = self.project();
*this.item = Some(item);
Ok(())
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.poll_ready(ctx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
Poll::Ready(Ok(()))
}
}
pub struct Reader<T>
where
T: bincode::Decode,
{
mmap: AsyncMmapFile,
count: usize,
first_pos: LSize,
_t: PhantomData<Arc<T>>,
}
impl<T> Reader<T>
where
T: bincode::Decode,
{
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, String> {
let mmap = AsyncOptions::new()
.read(true)
.open_mmap_file(path)
.await
.str_err()?;
mmap.try_lock_shared().str_err()?;
// read first pos and records count
let first_data: [u8; LEN_SIZE] = mmap.bytes(0, LEN_SIZE).str_err()?.try_into().str_err()?;
let first_pos = LSize::from_le_bytes(first_data);
let tab_len = (first_pos as usize) / LEN_SIZE;
let count = tab_len - 1;
Ok(Self {
mmap,
count,
first_pos,
_t: PhantomData,
})
}
pub fn len(&self) -> usize {
self.count
}
pub fn is_empty(&self) -> bool {
0 == self.len()
}
/// get item at index, reuse data buffer
pub async fn get_with_buf(&self, index: usize, data_buf: &mut Vec<u8>) -> Result<T, String> {
if index >= self.len() {
return Err("index out of range".into());
}
let next_pos: usize = (index + 1) * LEN_SIZE;
// read item data pos
let data_pos = if 0 == index {
self.first_pos
} else {
let tab_pos: usize = index * LEN_SIZE;
let pos_curr_data: [u8; LEN_SIZE] = self
.mmap
.bytes(tab_pos, LEN_SIZE)
.str_err()?
.try_into()
.str_err()?;
LSize::from_le_bytes(pos_curr_data)
} as usize;
// read next item pos
let pos_next_data: [u8; LEN_SIZE] = self
.mmap
.bytes(next_pos, LEN_SIZE)
.str_err()?
.try_into()
.str_err()?;
let data_pos_next = LSize::from_le_bytes(pos_next_data) as usize;
let data_len = data_pos_next - data_pos;
// read & unpack item data
let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?);
decoder.read_to_end(data_buf).await.str_err()?;
// decode item
let item: (T, usize) = bincode::decode_from_slice(data_buf, BINCODE_CFG).str_err()?;
data_buf.clear();
Ok(item.0)
}
/// get item at index
pub async fn get(&self, index: usize) -> Result<T, String> {
let mut data_buf: Vec<u8> = vec![];
self.get_with_buf(index, &mut data_buf).await
}
pub fn stream(&self) -> ReaderStream<'_, T> {
ReaderStream::new(self)
}
}
pub struct ReaderStream<'a, T>
where
T: bincode::Decode,
{
reader: &'a Reader<T>,
index: Option<usize>,
}
impl<'a, T> ReaderStream<'a, T>
where
T: bincode::Decode,
{
fn new(reader: &'a Reader<T>) -> Self {
ReaderStream {
reader,
index: None,
}
}
}
impl<'a, T> Stream for ReaderStream<'a, T>
where
T: bincode::Decode,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
if self.index.is_none() && !self.reader.is_empty() {
self.index = Some(0);
}
if self.index.unwrap() == self.reader.len() {
return Poll::Ready(None);
}
// FIXME: mayby work only if reader.get().poll() return Ready immediately
let future = self.reader.get(self.index.unwrap());
pin_mut!(future);
match Pin::new(&mut future).poll(cx) {
Poll::Ready(Ok(item)) => {
self.index = Some(self.index.unwrap() + 1);
Poll::Ready(Some(item))
}
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.reader.len();
if self.index.is_none() {
return (len, Some(len));
}
let index = self.index.unwrap();
let rem = if len > index + 1 {
len - (index + 1)
} else {
0
};
(rem, Some(rem))
}
}
pub struct BufReader<T>
where
T: bincode::Decode,
{
inner: Reader<T>,
buf: Vec<u8>,
}
impl<T> BufReader<T>
where
T: bincode::Decode,
{
pub async fn new<P: AsRef<Path>>(path: P, buf_size: usize) -> Result<Self, String> {
match Reader::<T>::new(path).await {
Ok(inner) => Ok(Self {
inner,
buf: Vec::with_capacity(buf_size),
}),
Err(e) => Err(e),
}
}
pub async fn get(&mut self, index: usize) -> Result<T, String> {
self.inner.get_with_buf(index, &mut self.buf).await
}
pub fn into_inner(self) -> Reader<T> {
self.inner
}
pub fn stream(self) -> BufReaderStream<T> {
BufReaderStream::new(self)
}
}
impl<T> From<Reader<T>> for BufReader<T>
where
T: bincode::Decode,
{
fn from(inner: Reader<T>) -> Self {
Self {
inner,
buf: Vec::new(),
}
}
}
impl<T> From<BufReader<T>> for Reader<T>
where
T: bincode::Decode,
{
fn from(value: BufReader<T>) -> Self {
value.into_inner()
}
}
impl<T> Deref for BufReader<T>
where
T: bincode::Decode,
{
type Target = Reader<T>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct BufReaderStream<T>
where
T: bincode::Decode,
{
reader: BufReader<T>,
index: Option<usize>,
}
impl<T> BufReaderStream<T>
where
T: bincode::Decode,
{
fn new(reader: BufReader<T>) -> Self {
BufReaderStream {
reader,
index: None,
}
}
async fn get_next(&mut self) -> Result<T, String> {
match self.index {
None => Err("index is None".into()),
Some(index) => {
let res = self.reader.get(index).await;
self.index = Some(index + 1);
res
}
}
}
}
impl<T> Stream for BufReaderStream<T>
where
T: bincode::Decode,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
if self.index.is_none() && !self.reader.is_empty() {
self.index = Some(0);
}
if self.index.unwrap() == self.reader.len() {
return Poll::Ready(None);
}
// FIXME: mayby work only if reader.get().poll() return Ready immediately
let future = self.get_next();
pin_mut!(future);
match Pin::new(&mut future).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.reader.len();
if self.index.is_none() {
return (len, Some(len));
}
let index = self.index.unwrap();
let rem = if len > index + 1 {
len - (index + 1)
} else {
0
};
(rem, Some(rem))
}
}
#[cfg(test)]
mod test {
use super::*;
use core::fmt::Debug;
use tempfile::tempdir;
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct TestData {
num: u64,
test: String,
vnum: Vec<u64>,
vstr: Vec<String>,
}
fn gen_data(count: usize) -> impl Iterator<Item = TestData> {
(0..count).map(|i| TestData {
num: i as u64,
test: "test".repeat(i),
vnum: (0..i * 120).map(|x| (x ^ 0x345FE34) as u64).collect(),
vstr: (0..i * 111).map(|x| "test".repeat(x)).collect(),
})
}
async fn assert_data_eq((x, y): (&TestData, TestData)) {
assert_eq!(*x, y);
}
#[tokio::test]
async fn test_write() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
}
#[tokio::test]
async fn test_write_read() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() {
let ritem = reader.get(idx).await.expect("get");
assert_eq!(*item, ritem);
}
}
#[tokio::test]
async fn test_write_sink_read() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone()).map(Ok);
pin_mut!(src);
src.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() {
let ritem = reader.get(idx).await.expect("get");
assert_eq!(*item, ritem);
}
}
#[tokio::test]
async fn test_write_read_get_with_buf() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() {
let mut data_buf: Vec<u8> = vec![];
let ritem = reader.get_with_buf(idx, &mut data_buf).await.expect("get");
assert_eq!(*item, ritem);
}
}
#[tokio::test]
async fn test_write_read_stream() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len());
let dst_stream = reader.stream();
let src_stream = futures::stream::iter(items.iter());
let mut count = 0;
src_stream
.zip(dst_stream)
.map(|x| {
count += 1;
x
})
.for_each(assert_data_eq)
.await;
assert_eq!(count, items.len())
}
/// sharing Reader instance between threads
#[tokio::test]
async fn test_share_reader() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile).await.expect("new reader");
assert_eq!(items.len(), reader.len());
let reader = Arc::new(reader);
for _ in 0..=3 {
let cur_items = items.clone();
let cur_reader = Arc::clone(&reader);
tokio::spawn(async move {
let dst_stream = cur_reader.stream();
let src_stream = futures::stream::iter(cur_items.iter());
src_stream.zip(dst_stream).for_each(assert_data_eq).await;
});
}
}
#[tokio::test]
async fn test_write_bufread() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let mut reader = BufReader::<TestData>::new(&tmpfile, 4096)
.await
.expect("new buf reader");
assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() {
let ritem = reader.get(idx).await.expect("get");
assert_eq!(*item, ritem);
}
}
#[tokio::test]
async fn test_write_bufread_stream() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader = BufReader::<TestData>::new(&tmpfile, 4096)
.await
.expect("new buf reader");
assert_eq!(items.len(), reader.len());
let dst_stream = reader.stream();
let src_stream = futures::stream::iter(items.iter());
let mut count = 0;
src_stream
.zip(dst_stream)
.map(|x| {
count += 1;
x
})
.for_each(assert_data_eq)
.await;
assert_eq!(count, items.len())
}
}