add test for writer.sink()

This commit is contained in:
Dmitry Belyaev 2023-08-15 10:45:08 +03:00
parent 0adc11cabf
commit 9196c61a12
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -439,6 +439,34 @@ mod test {
} }
} }
#[tokio::test]
async fn test_write_sink_read() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
..Default::default()
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone()).map(Ok);
pin_mut!(src);
src.forward(writer.sink()).await.expect("forward");
writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() {
let ritem = reader.get(idx).await.expect("get");
assert_eq!(*item, ritem);
}
}
#[tokio::test] #[tokio::test]
async fn test_write_read_get_with_buf() { async fn test_write_read_get_with_buf() {
let dir = tempdir().expect("tempdir"); let dir = tempdir().expect("tempdir");
@ -482,9 +510,9 @@ mod test {
let items_iter = gen_data(5); let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect(); let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone()).map(Ok); let src = futures::stream::iter(items.clone());
pin_mut!(src); pin_mut!(src);
src.forward(writer.sink()).await.expect("forward"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
@ -519,9 +547,9 @@ mod test {
let items_iter = gen_data(5); let items_iter = gen_data(5);
let items: Vec<TestData> = items_iter.collect(); let items: Vec<TestData> = items_iter.collect();
let src = futures::stream::iter(items.clone()).map(Ok); let src = futures::stream::iter(items.clone());
pin_mut!(src); pin_mut!(src);
src.forward(writer.sink()).await.expect("forward"); writer.load(src).await.expect("load");
writer.finish().await.expect("finish write"); writer.finish().await.expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");