async_db: add WriterSink
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Dmitry Belyaev 2023-08-13 18:16:49 +03:00
parent 3e3572ce12
commit 31c02ae8ef
2 changed files with 83 additions and 12 deletions

View File

@ -177,16 +177,12 @@ async fn db_writer_task(rx: UnboundedReceiver<Question>) {
let writer_opts = WriterOpts::default();
let mut writer: async_db::Writer<Question> =
async_db::Writer::new(NEW_DB_FILENAME, writer_opts)
.await
.expect("new db writer");
let mut stream: UnboundedReceiverStream<_> = rx.into();
writer
.load(&mut stream)
.await
.unwrap_or_else(|e| panic!("db writer load, {e:#?}"));
let stream: UnboundedReceiverStream<_> = rx.into();
let stream = stream.map(Ok);
stream.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("db writer finish");
println!("write done");

View File

@ -4,6 +4,7 @@ use std::{path::Path, sync::Arc};
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::tokio::bufread::ZstdEncoder;
use async_compression::Level;
use futures::sink::Sink;
use futures::stream::StreamExt;
use futures_core::stream::Stream;
use futures_core::Future;
@ -124,6 +125,80 @@ where
self.out.flush().await.str_err()?;
Ok(())
}
pub fn sink(&mut self) -> WriterSink<'_, T> {
WriterSink {
writer: self,
item: None,
}
}
}
use pin_project::pin_project;
#[pin_project]
pub struct WriterSink<'a, T>
where
T: bincode::Encode,
{
#[pin]
writer: &'a mut Writer<T>,
item: Option<T>,
}
impl<'a, T> Sink<T> for WriterSink<'a, T>
where
T: bincode::Encode,
{
type Error = String;
fn poll_ready(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), String>> {
let mut this = self.project();
if this.item.is_none() {
return Poll::Ready(Ok(()));
}
let item = this.item.take().unwrap();
let fut = this.writer.push(item);
pin_mut!(fut);
match fut.poll(ctx) {
Poll::Ready(Ok(_)) => {
*this.item = None;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
*this.item = None;
Poll::Ready(Err(e))
}
Poll::Pending => Poll::Pending,
}
}
fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let this = self.project();
*this.item = Some(item);
Ok(())
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.poll_ready(ctx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
Poll::Ready(Ok(()))
}
}
pub struct Reader<T>
@ -369,9 +444,9 @@ mod test {
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
let src = futures::stream::iter(items.clone()).map(Ok);
pin_mut!(src);
writer.load(src).await.expect("load");
src.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
@ -406,9 +481,9 @@ mod test {
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone());
let src = futures::stream::iter(items.clone()).map(Ok);
pin_mut!(src);
writer.load(src).await.expect("load");
src.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");