diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs index e3d1f81..5c44eeb 100644 --- a/lib/src/async_db.rs +++ b/lib/src/async_db.rs @@ -402,4 +402,43 @@ mod test { src_stream.zip(dst_stream).for_each(test_values).await; } + /// sharing Reader instance between threads + #[tokio::test] + async fn test_share_reader() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + compress_lvl: Level::Default, + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = stream_iter(items.clone().into_iter()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile, 2048).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + async fn test_values((x, y): (&TestData, TestData)) { + assert_eq!(*x, y); + } + + let reader = Arc::new(reader); + for _ in 0..=3 { + let cur_items = items.clone(); + let cur_reader = Arc::clone(&reader); + tokio::spawn(async move { + let dst_stream = cur_reader.stream(); + let src_stream = stream_iter(cur_items.iter()); + + src_stream.zip(dst_stream).for_each(test_values).await; + }); + } + } }