diff --git a/lib/src/source.rs b/lib/src/source.rs index a62b251..9217b15 100644 --- a/lib/src/source.rs +++ b/lib/src/source.rs @@ -218,10 +218,11 @@ pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader}; pub mod reader_async { use crate::util::ErrorToString; + use async_stream::stream; use async_zip::tokio::read::seek::ZipFileReader; use futures_core::stream::Stream; use futures_core::Future; - use futures_util::{pin_mut, AsyncReadExt}; + use futures_util::{pin_mut, AsyncReadExt, StreamExt}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -248,24 +249,49 @@ pub mod reader_async { index: None, } } - async fn parse_zip_entry( + pub async fn get_next( &mut self, - ) -> Result<(String, Result), String> + ) -> Option), String>> where R: AsyncRead + AsyncSeek + Unpin, { - let mut reader = self + let len = self.zipfile.file().entries().len(); + if self.index.is_none() && len > 0 { + self.index = Some(0); + } + + let index = &mut self.index; + if index.unwrap() == len { + return None; + } + + let reader = self .zipfile .reader_with_entry(self.index.unwrap()) - .await - .str_err()?; - let filename = reader.entry().filename().clone().into_string().str_err()?; + .await; + if let Err(error) = reader { + println!("{:#?}", error); // DEBUG + return Some(Err(error.to_string())) + } + let mut reader = reader.unwrap(); + + let filename = reader.entry().filename().clone().into_string().unwrap(); let mut data: Vec = Vec::new(); - reader.read_to_end(&mut data).await.str_err()?; + let readed = reader.read_to_end(&mut data).await; + if let Err(error) = readed { + return Some(Err(error.to_string())) + } let parsed: Result = serde_json::from_slice(&data); self.index = Some(self.index.unwrap() + 1); - Ok((filename, parsed)) + Some(Ok((filename, parsed))) + } + pub fn stream(&mut self) -> impl Stream)> + '_ { + stream! { + while let Some(Ok(item)) = self.get_next().await { + yield item + } + } } } @@ -276,21 +302,11 @@ pub mod reader_async { type Item = (String, Result); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let len = self.zipfile.file().entries().len(); - if self.index.is_none() && len > 0 { - self.index = Some(0); - } - - let index = &mut self.index; - if index.unwrap() == len { - return Poll::Ready(None); - } - - let future = self.parse_zip_entry(); + let future = self.stream(); pin_mut!(future); - match Pin::new(&mut future).poll(cx) { - Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), - Poll::Ready(Err(_)) => Poll::Ready(None), + match Pin::new(&mut future.next()).poll(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } }