rewrite SourceQuestionsZipReaderAsync stream

(poll_next not work)
This commit is contained in:
Dmitry Belyaev 2023-08-07 22:02:41 +03:00
parent 851058245d
commit 80dda8d821
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -218,10 +218,11 @@ pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader};
pub mod reader_async { pub mod reader_async {
use crate::util::ErrorToString; use crate::util::ErrorToString;
use async_stream::stream;
use async_zip::tokio::read::seek::ZipFileReader; use async_zip::tokio::read::seek::ZipFileReader;
use futures_core::stream::Stream; use futures_core::stream::Stream;
use futures_core::Future; use futures_core::Future;
use futures_util::{pin_mut, AsyncReadExt}; use futures_util::{pin_mut, AsyncReadExt, StreamExt};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -248,24 +249,49 @@ pub mod reader_async {
index: None, index: None,
} }
} }
async fn parse_zip_entry( pub async fn get_next(
&mut self, &mut self,
) -> Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String> ) -> Option<Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String>>
where where
R: AsyncRead + AsyncSeek + Unpin, R: AsyncRead + AsyncSeek + Unpin,
{ {
let mut reader = self let len = self.zipfile.file().entries().len();
if self.index.is_none() && len > 0 {
self.index = Some(0);
}
let index = &mut self.index;
if index.unwrap() == len {
return None;
}
let reader = self
.zipfile .zipfile
.reader_with_entry(self.index.unwrap()) .reader_with_entry(self.index.unwrap())
.await .await;
.str_err()?; if let Err(error) = reader {
let filename = reader.entry().filename().clone().into_string().str_err()?; println!("{:#?}", error); // DEBUG
return Some(Err(error.to_string()))
}
let mut reader = reader.unwrap();
let filename = reader.entry().filename().clone().into_string().unwrap();
let mut data: Vec<u8> = Vec::new(); let mut data: Vec<u8> = Vec::new();
reader.read_to_end(&mut data).await.str_err()?; let readed = reader.read_to_end(&mut data).await;
if let Err(error) = readed {
return Some(Err(error.to_string()))
}
let parsed: Result<SourceQuestionsBatch, _> = serde_json::from_slice(&data); let parsed: Result<SourceQuestionsBatch, _> = serde_json::from_slice(&data);
self.index = Some(self.index.unwrap() + 1); self.index = Some(self.index.unwrap() + 1);
Ok((filename, parsed)) Some(Ok((filename, parsed)))
}
pub fn stream(&mut self) -> impl Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)> + '_ {
stream! {
while let Some(Ok(item)) = self.get_next().await {
yield item
}
}
} }
} }
@ -276,21 +302,11 @@ pub mod reader_async {
type Item = (String, Result<SourceQuestionsBatch, serde_json::Error>); type Item = (String, Result<SourceQuestionsBatch, serde_json::Error>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let len = self.zipfile.file().entries().len(); let future = self.stream();
if self.index.is_none() && len > 0 {
self.index = Some(0);
}
let index = &mut self.index;
if index.unwrap() == len {
return Poll::Ready(None);
}
let future = self.parse_zip_entry();
pin_mut!(future); pin_mut!(future);
match Pin::new(&mut future).poll(cx) { match Pin::new(&mut future.next()).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(Err(_)) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} }