From 467ebfcc67d5f33c1d78a9a5f9d38f7e52c0a5ad Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 7 Aug 2023 22:04:29 +0300 Subject: [PATCH] rewrite async converter (poll_next removed // not work) --- lib/src/questions.rs | 47 +++++++++++++------------------------------- 1 file changed, 14 insertions(+), 33 deletions(-) diff --git a/lib/src/questions.rs b/lib/src/questions.rs index 7085ac8..409f018 100644 --- a/lib/src/questions.rs +++ b/lib/src/questions.rs @@ -143,11 +143,12 @@ pub use convert::QuestionsConverter; #[cfg(feature = "convert_async")] pub mod convert_async { - use async_stream::stream; + use futures::stream; use futures_core::stream::Stream; use futures_core::Future; use futures_util::{pin_mut, StreamExt}; + use std::future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -220,39 +221,19 @@ pub mod convert_async { T: Stream)> + std::marker::Unpin, { - fn convert(&mut self) -> impl Stream + '_ { - stream! { - while let Some((filename, Ok(batch))) = self.inner.next().await { - let mut batch = batch; - batch.filename = filename; - let questions: Vec = batch.into(); - for question in questions { - yield question - } + pub fn convert(self) -> impl Stream { + self.inner.filter_map(|(name,r)| async move { + if r.is_err() { + None + } else { + Some((name, r.unwrap())) } - } - } - } - - impl Stream for QuestionsConverterAsync - where - T: Stream)> - + std::marker::Unpin, - { - type Item = Question; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let convert = self.convert(); - pin_mut!(convert); - match Pin::new(&mut convert.next()).poll(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() + }).flat_map(|(filename, batch)| stream::iter({ + let mut batch = batch; + batch.filename = filename; + let questions: Vec = batch.into(); + questions + })) } } }