add async feature #1
@ -715,4 +715,73 @@ mod test {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_write_bufread() {
|
||||||
|
let dir = tempdir().expect("tempdir");
|
||||||
|
let tmpfile = dir.path().join("test.tmp");
|
||||||
|
let opts = WriterOpts {
|
||||||
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
||||||
|
|
||||||
|
let items_iter = gen_data(5);
|
||||||
|
let items: Vec<TestData> = items_iter.collect();
|
||||||
|
|
||||||
|
let src = futures::stream::iter(items.clone());
|
||||||
|
pin_mut!(src);
|
||||||
|
writer.load(src).await.expect("load");
|
||||||
|
writer.finish().await.expect("finish write");
|
||||||
|
|
||||||
|
let mut reader = BufReader::<TestData>::new(&tmpfile, 4096)
|
||||||
|
.await
|
||||||
|
.expect("new buf reader");
|
||||||
|
assert_eq!(items.len(), reader.len());
|
||||||
|
|
||||||
|
for (idx, item) in items.iter().enumerate() {
|
||||||
|
let ritem = reader.get(idx).await.expect("get");
|
||||||
|
assert_eq!(*item, ritem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_write_bufread_stream() {
|
||||||
|
let dir = tempdir().expect("tempdir");
|
||||||
|
let tmpfile = dir.path().join("test.tmp");
|
||||||
|
let opts = WriterOpts {
|
||||||
|
data_buf_size: 10 * 1024 * 1024,
|
||||||
|
out_buf_size: 10 * 1024 * 1024,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
||||||
|
|
||||||
|
let items_iter = gen_data(5);
|
||||||
|
let items: Vec<TestData> = items_iter.collect();
|
||||||
|
|
||||||
|
let src = futures::stream::iter(items.clone());
|
||||||
|
pin_mut!(src);
|
||||||
|
writer.load(src).await.expect("load");
|
||||||
|
writer.finish().await.expect("finish write");
|
||||||
|
|
||||||
|
let reader = BufReader::<TestData>::new(&tmpfile, 4096)
|
||||||
|
.await
|
||||||
|
.expect("new buf reader");
|
||||||
|
assert_eq!(items.len(), reader.len());
|
||||||
|
|
||||||
|
let dst_stream = reader.stream();
|
||||||
|
let src_stream = futures::stream::iter(items.iter());
|
||||||
|
|
||||||
|
let mut count = 0;
|
||||||
|
src_stream
|
||||||
|
.zip(dst_stream)
|
||||||
|
.map(|x| {
|
||||||
|
count += 1;
|
||||||
|
x
|
||||||
|
})
|
||||||
|
.for_each(assert_data_eq)
|
||||||
|
.await;
|
||||||
|
assert_eq!(count, items.len())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user