add async feature #1
@ -216,16 +216,10 @@ pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader};
|
|||||||
|
|
||||||
#[cfg(feature = "source_async")]
|
#[cfg(feature = "source_async")]
|
||||||
pub mod reader_async {
|
pub mod reader_async {
|
||||||
use crate::util::ErrorToString;
|
|
||||||
|
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use async_zip::tokio::read::seek::ZipFileReader;
|
use async_zip::tokio::read::seek::ZipFileReader;
|
||||||
use futures_core::stream::Stream;
|
use futures_core::stream::Stream;
|
||||||
use futures_core::Future;
|
use futures_util::AsyncReadExt;
|
||||||
use futures_util::{pin_mut, AsyncReadExt, StreamExt};
|
|
||||||
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use tokio::io::{AsyncRead, AsyncSeek};
|
use tokio::io::{AsyncRead, AsyncSeek};
|
||||||
|
|
||||||
@ -249,29 +243,30 @@ pub mod reader_async {
|
|||||||
index: None,
|
index: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub async fn get_next(
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.zipfile.file().entries().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.len() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Option<Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String>>
|
index: usize,
|
||||||
|
) -> Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String>
|
||||||
where
|
where
|
||||||
R: AsyncRead + AsyncSeek + Unpin,
|
R: AsyncRead + AsyncSeek + Unpin,
|
||||||
{
|
{
|
||||||
let len = self.zipfile.file().entries().len();
|
let len = self.len();
|
||||||
if self.index.is_none() && len > 0 {
|
if index >= len {
|
||||||
self.index = Some(0);
|
return Err(format!("get index={index}, when len={len}"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let index = &mut self.index;
|
let reader = self.zipfile.reader_with_entry(index).await;
|
||||||
if index.unwrap() == len {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let reader = self
|
|
||||||
.zipfile
|
|
||||||
.reader_with_entry(self.index.unwrap())
|
|
||||||
.await;
|
|
||||||
if let Err(error) = reader {
|
if let Err(error) = reader {
|
||||||
println!("{:#?}", error); // DEBUG
|
return Err(format!("reader_with_entry: {error:?}"));
|
||||||
return Some(Err(error.to_string()))
|
|
||||||
}
|
}
|
||||||
let mut reader = reader.unwrap();
|
let mut reader = reader.unwrap();
|
||||||
|
|
||||||
@ -279,14 +274,34 @@ pub mod reader_async {
|
|||||||
let mut data: Vec<u8> = Vec::new();
|
let mut data: Vec<u8> = Vec::new();
|
||||||
let readed = reader.read_to_end(&mut data).await;
|
let readed = reader.read_to_end(&mut data).await;
|
||||||
if let Err(error) = readed {
|
if let Err(error) = readed {
|
||||||
return Some(Err(error.to_string()))
|
return Err(format!("read_to_end: {error:?}"));
|
||||||
}
|
}
|
||||||
let parsed: Result<SourceQuestionsBatch, _> = serde_json::from_slice(&data);
|
let parsed: Result<SourceQuestionsBatch, _> = serde_json::from_slice(&data);
|
||||||
|
Ok((filename, parsed))
|
||||||
|
}
|
||||||
|
pub async fn get_next(
|
||||||
|
&mut self,
|
||||||
|
) -> Option<Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String>>
|
||||||
|
where
|
||||||
|
R: AsyncRead + AsyncSeek + Unpin,
|
||||||
|
{
|
||||||
|
if self.index.is_none() && !self.is_empty() {
|
||||||
|
self.index = Some(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.index.unwrap() >= self.len() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let item = self.get(self.index.unwrap()).await;
|
||||||
self.index = Some(self.index.unwrap() + 1);
|
self.index = Some(self.index.unwrap() + 1);
|
||||||
|
|
||||||
Some(Ok((filename, parsed)))
|
Some(item)
|
||||||
}
|
}
|
||||||
pub fn stream(&mut self) -> impl Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)> + '_ {
|
pub fn stream(
|
||||||
|
&mut self,
|
||||||
|
) -> impl Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)> + '_
|
||||||
|
{
|
||||||
stream! {
|
stream! {
|
||||||
while let Some(Ok(item)) = self.get_next().await {
|
while let Some(Ok(item)) = self.get_next().await {
|
||||||
yield item
|
yield item
|
||||||
@ -295,38 +310,6 @@ pub mod reader_async {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> Stream for SourceQuestionsZipReaderAsync<R>
|
|
||||||
where
|
|
||||||
R: AsyncRead + AsyncSeek + Unpin,
|
|
||||||
{
|
|
||||||
type Item = (String, Result<SourceQuestionsBatch, serde_json::Error>);
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
let future = self.stream();
|
|
||||||
pin_mut!(future);
|
|
||||||
match Pin::new(&mut future.next()).poll(cx) {
|
|
||||||
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
|
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
let len = self.zipfile.file().entries().len();
|
|
||||||
if self.index.is_none() {
|
|
||||||
return (len, Some(len));
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = self.index.unwrap();
|
|
||||||
let rem = if len > index + 1 {
|
|
||||||
len - (index + 1)
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
(rem, Some(rem))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait ReadSourceQuestionsBatchesAsync<R>
|
pub trait ReadSourceQuestionsBatchesAsync<R>
|
||||||
where
|
where
|
||||||
R: AsyncRead + AsyncSeek + Unpin,
|
R: AsyncRead + AsyncSeek + Unpin,
|
||||||
|
Loading…
Reference in New Issue
Block a user