questions: add QuestionsConverterAsyncForStream
This commit is contained in:
		@@ -16,7 +16,8 @@ use tokio::{fs, io};
 | 
				
			|||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
 | 
					use tokio_stream::wrappers::UnboundedReceiverStream;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use chgk_ledb_lib::async_db;
 | 
					use chgk_ledb_lib::async_db;
 | 
				
			||||||
use chgk_ledb_lib::questions::{Question, QuestionsConverterAsync};
 | 
					use chgk_ledb_lib::questions::Question;
 | 
				
			||||||
 | 
					use chgk_ledb_lib::questions::QuestionsConverterAsyncForStream;
 | 
				
			||||||
use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync;
 | 
					use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const ZIP_FILENAME: &str = "json.zip";
 | 
					const ZIP_FILENAME: &str = "json.zip";
 | 
				
			||||||
@@ -56,7 +57,7 @@ async fn zip_reader_task(tx: UnboundedSender<Question>) {
 | 
				
			|||||||
    let source_questions = source_questions.stream();
 | 
					    let source_questions = source_questions.stream();
 | 
				
			||||||
    pin_mut!(source_questions);
 | 
					    pin_mut!(source_questions);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let converter = QuestionsConverterAsync::from(source_questions).convert();
 | 
					    let converter = source_questions.converter().convert();
 | 
				
			||||||
    pin_mut!(converter);
 | 
					    pin_mut!(converter);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let mut num: u32 = 0;
 | 
					    let mut num: u32 = 0;
 | 
				
			||||||
@@ -97,7 +98,7 @@ async fn read_from_zip(file_num: usize, mut num: usize) -> Option<Question> {
 | 
				
			|||||||
    let src = source.get(file_index).await;
 | 
					    let src = source.get(file_index).await;
 | 
				
			||||||
    let src = stream::once(async { src.expect("get source file") });
 | 
					    let src = stream::once(async { src.expect("get source file") });
 | 
				
			||||||
    pin_mut!(src);
 | 
					    pin_mut!(src);
 | 
				
			||||||
    let converter = QuestionsConverterAsync::from(src);
 | 
					    let converter = src.converter();
 | 
				
			||||||
    let questions: Vec<_> = converter.convert().collect().await;
 | 
					    let questions: Vec<_> = converter.convert().collect().await;
 | 
				
			||||||
    if num == 0 {
 | 
					    if num == 0 {
 | 
				
			||||||
        num = (1..=questions.len()).choose(&mut rng).unwrap();
 | 
					        num = (1..=questions.len()).choose(&mut rng).unwrap();
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -224,6 +224,24 @@ pub mod convert_async {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub trait QuestionsConverterAsyncForStream<T>
 | 
				
			||||||
 | 
					    where
 | 
				
			||||||
 | 
					        T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
 | 
				
			||||||
 | 
					            + std::marker::Unpin,
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        fn converter(&mut self) -> QuestionsConverterAsync<&mut T>;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    impl<T> QuestionsConverterAsyncForStream<T> for T
 | 
				
			||||||
 | 
					    where
 | 
				
			||||||
 | 
					        T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
 | 
				
			||||||
 | 
					            + std::marker::Unpin,
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        fn converter(&mut self) -> QuestionsConverterAsync<&mut T> {
 | 
				
			||||||
 | 
					            QuestionsConverterAsync::from(self)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    impl<T> QuestionsConverterAsync<T>
 | 
					    impl<T> QuestionsConverterAsync<T>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
 | 
					        T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
 | 
				
			||||||
@@ -267,7 +285,7 @@ pub mod convert_async {
 | 
				
			|||||||
            });
 | 
					            });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            pin_mut!(source);
 | 
					            pin_mut!(source);
 | 
				
			||||||
            let converter = QuestionsConverterAsync::from(source);
 | 
					            let converter = source.converter();
 | 
				
			||||||
            let converter = converter.convert();
 | 
					            let converter = converter.convert();
 | 
				
			||||||
            let converted: Vec<_> = converter.collect().await;
 | 
					            let converted: Vec<_> = converter.collect().await;
 | 
				
			||||||
            assert_yaml_snapshot!(converted, @r#"
 | 
					            assert_yaml_snapshot!(converted, @r#"
 | 
				
			||||||
@@ -292,7 +310,7 @@ pub mod convert_async {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
#[cfg(feature = "convert_async")]
 | 
					#[cfg(feature = "convert_async")]
 | 
				
			||||||
pub use convert_async::QuestionsConverterAsync;
 | 
					pub use convert_async::{QuestionsConverterAsync, QuestionsConverterAsyncForStream};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(test)]
 | 
					#[cfg(test)]
 | 
				
			||||||
mod test {
 | 
					mod test {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user