diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs index 45ec1a7..224224b 100644 --- a/lib/src/async_db.rs +++ b/lib/src/async_db.rs @@ -715,4 +715,73 @@ mod test { }); } } + + #[tokio::test] + async fn test_write_bufread() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let mut reader = BufReader::::new(&tmpfile, 4096) + .await + .expect("new buf reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let ritem = reader.get(idx).await.expect("get"); + assert_eq!(*item, ritem); + } + } + + #[tokio::test] + async fn test_write_bufread_stream() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader = BufReader::::new(&tmpfile, 4096) + .await + .expect("new buf reader"); + assert_eq!(items.len(), reader.len()); + + let dst_stream = reader.stream(); + let src_stream = futures::stream::iter(items.iter()); + + let mut count = 0; + src_stream + .zip(dst_stream) + .map(|x| { + count += 1; + x + }) + .for_each(assert_data_eq) + .await; + assert_eq!(count, items.len()) + } }