add async test_share_reader

This commit is contained in:
Dmitry Belyaev 2023-08-06 01:05:36 +03:00
parent 39ce0d8ceb
commit cad8ff0404
1 changed files with 39 additions and 0 deletions

View File

@ -402,4 +402,43 @@ mod test {
src_stream.zip(dst_stream).for_each(test_values).await;
}
/// sharing Reader instance between threads
#[tokio::test]
async fn test_share_reader() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
compress_lvl: Level::Default,
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = stream_iter(items.clone().into_iter());
pin_mut!(src);
writer.load(src).await.expect("load");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
assert_eq!(items.len(), reader.len());
async fn test_values((x, y): (&TestData, TestData)) {
assert_eq!(*x, y);
}
let reader = Arc::new(reader);
for _ in 0..=3 {
let cur_items = items.clone();
let cur_reader = Arc::clone(&reader);
tokio::spawn(async move {
let dst_stream = cur_reader.stream();
let src_stream = stream_iter(cur_items.iter());
src_stream.zip(dst_stream).for_each(test_values).await;
});
}
}
}