From 7cbf5439b7e51c876774ba0e7a463f4c4e0c8f92 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 8 Aug 2023 14:20:40 +0300 Subject: [PATCH] rewrite SourceQuestionsZipReaderAsync "impl Stream for SourceQuestionsZipReaderAsync" dropped --- lib/src/source.rs | 99 ++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 58 deletions(-) diff --git a/lib/src/source.rs b/lib/src/source.rs index 9217b15..81d02dc 100644 --- a/lib/src/source.rs +++ b/lib/src/source.rs @@ -216,16 +216,10 @@ pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader}; #[cfg(feature = "source_async")] pub mod reader_async { - use crate::util::ErrorToString; - use async_stream::stream; use async_zip::tokio::read::seek::ZipFileReader; use futures_core::stream::Stream; - use futures_core::Future; - use futures_util::{pin_mut, AsyncReadExt, StreamExt}; - - use std::pin::Pin; - use std::task::{Context, Poll}; + use futures_util::AsyncReadExt; use tokio::io::{AsyncRead, AsyncSeek}; @@ -249,29 +243,30 @@ pub mod reader_async { index: None, } } - pub async fn get_next( + + pub fn len(&self) -> usize { + self.zipfile.file().entries().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub async fn get( &mut self, - ) -> Option), String>> + index: usize, + ) -> Result<(String, Result), String> where R: AsyncRead + AsyncSeek + Unpin, { - let len = self.zipfile.file().entries().len(); - if self.index.is_none() && len > 0 { - self.index = Some(0); + let len = self.len(); + if index >= len { + return Err(format!("get index={index}, when len={len}")); } - let index = &mut self.index; - if index.unwrap() == len { - return None; - } - - let reader = self - .zipfile - .reader_with_entry(self.index.unwrap()) - .await; + let reader = self.zipfile.reader_with_entry(index).await; if let Err(error) = reader { - println!("{:#?}", error); // DEBUG - return Some(Err(error.to_string())) + return Err(format!("reader_with_entry: {error:?}")); } let mut reader = reader.unwrap(); @@ -279,14 +274,34 @@ pub mod reader_async { let mut data: Vec = Vec::new(); let readed = reader.read_to_end(&mut data).await; if let Err(error) = readed { - return Some(Err(error.to_string())) + return Err(format!("read_to_end: {error:?}")); } let parsed: Result = serde_json::from_slice(&data); + Ok((filename, parsed)) + } + pub async fn get_next( + &mut self, + ) -> Option), String>> + where + R: AsyncRead + AsyncSeek + Unpin, + { + if self.index.is_none() && !self.is_empty() { + self.index = Some(0); + } + + if self.index.unwrap() >= self.len() { + return None; + } + + let item = self.get(self.index.unwrap()).await; self.index = Some(self.index.unwrap() + 1); - Some(Ok((filename, parsed))) + Some(item) } - pub fn stream(&mut self) -> impl Stream)> + '_ { + pub fn stream( + &mut self, + ) -> impl Stream)> + '_ + { stream! { while let Some(Ok(item)) = self.get_next().await { yield item @@ -295,38 +310,6 @@ pub mod reader_async { } } - impl Stream for SourceQuestionsZipReaderAsync - where - R: AsyncRead + AsyncSeek + Unpin, - { - type Item = (String, Result); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let future = self.stream(); - pin_mut!(future); - match Pin::new(&mut future.next()).poll(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.zipfile.file().entries().len(); - if self.index.is_none() { - return (len, Some(len)); - } - - let index = self.index.unwrap(); - let rem = if len > index + 1 { - len - (index + 1) - } else { - 0 - }; - (rem, Some(rem)) - } - } - pub trait ReadSourceQuestionsBatchesAsync where R: AsyncRead + AsyncSeek + Unpin,