rewrite async converter
(poll_next removed // not work)
This commit is contained in:
parent
80dda8d821
commit
467ebfcc67
@ -143,11 +143,12 @@ pub use convert::QuestionsConverter;
|
|||||||
|
|
||||||
#[cfg(feature = "convert_async")]
|
#[cfg(feature = "convert_async")]
|
||||||
pub mod convert_async {
|
pub mod convert_async {
|
||||||
use async_stream::stream;
|
use futures::stream;
|
||||||
use futures_core::stream::Stream;
|
use futures_core::stream::Stream;
|
||||||
use futures_core::Future;
|
use futures_core::Future;
|
||||||
use futures_util::{pin_mut, StreamExt};
|
use futures_util::{pin_mut, StreamExt};
|
||||||
|
|
||||||
|
use std::future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
@ -220,39 +221,19 @@ pub mod convert_async {
|
|||||||
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
|
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
|
||||||
+ std::marker::Unpin,
|
+ std::marker::Unpin,
|
||||||
{
|
{
|
||||||
fn convert(&mut self) -> impl Stream<Item = Question> + '_ {
|
pub fn convert(self) -> impl Stream<Item = Question> {
|
||||||
stream! {
|
self.inner.filter_map(|(name,r)| async move {
|
||||||
while let Some((filename, Ok(batch))) = self.inner.next().await {
|
if r.is_err() {
|
||||||
let mut batch = batch;
|
None
|
||||||
batch.filename = filename;
|
} else {
|
||||||
let questions: Vec<Question> = batch.into();
|
Some((name, r.unwrap()))
|
||||||
for question in questions {
|
|
||||||
yield question
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}).flat_map(|(filename, batch)| stream::iter({
|
||||||
}
|
let mut batch = batch;
|
||||||
}
|
batch.filename = filename;
|
||||||
|
let questions: Vec<Question> = batch.into();
|
||||||
impl<T> Stream for QuestionsConverterAsync<T>
|
questions
|
||||||
where
|
}))
|
||||||
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
|
|
||||||
+ std::marker::Unpin,
|
|
||||||
{
|
|
||||||
type Item = Question;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
let convert = self.convert();
|
|
||||||
pin_mut!(convert);
|
|
||||||
match Pin::new(&mut convert.next()).poll(cx) {
|
|
||||||
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
|
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
self.inner.size_hint()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user