Compare commits
No commits in common. "a496f37ef2ce6a4061351c81d2f561a24931a259" and "0c0a4e31baa7090d2f96e5d913a552f2870ff17b" have entirely different histories.
a496f37ef2
...
0c0a4e31ba
@ -1,5 +1,4 @@
|
|||||||
kind: pipeline
|
kind: pipeline
|
||||||
type: docker
|
|
||||||
name: default
|
name: default
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
@ -7,8 +6,8 @@ steps:
|
|||||||
image: rust:1-alpine
|
image: rust:1-alpine
|
||||||
commands:
|
commands:
|
||||||
- apk add --no-cache musl-dev
|
- apk add --no-cache musl-dev
|
||||||
- cargo build --verbose --all-features --all
|
- cargo build --verbose --all
|
||||||
- cargo test --verbose --all-features --all
|
- cargo test --verbose --all
|
||||||
environment:
|
environment:
|
||||||
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
|
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
|
||||||
|
|
||||||
|
@ -1,22 +1,21 @@
|
|||||||
use std::{marker::PhantomData, path::Path, sync::Arc};
|
use std::{marker::PhantomData, path::Path, sync::Arc};
|
||||||
|
|
||||||
|
use async_stream::stream;
|
||||||
|
use tokio::pin;
|
||||||
|
|
||||||
use async_compression::tokio::bufread::ZstdDecoder;
|
use async_compression::tokio::bufread::ZstdDecoder;
|
||||||
use async_compression::tokio::write::ZstdEncoder;
|
use async_compression::tokio::write::ZstdEncoder;
|
||||||
use async_compression::Level;
|
use async_compression::Level;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::{self, StreamExt};
|
||||||
use futures_core::stream::Stream;
|
use futures_core::stream::Stream;
|
||||||
use futures_core::Future;
|
|
||||||
use futures_util::pin_mut;
|
use futures_util::pin_mut;
|
||||||
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs,
|
fs,
|
||||||
io::{self, AsyncReadExt, AsyncWriteExt},
|
io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
|
||||||
};
|
};
|
||||||
|
|
||||||
use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt, AsyncOptions};
|
use fmmap::tokio::{AsyncMmapFile, AsyncOptions};
|
||||||
|
|
||||||
type LSize = u32;
|
type LSize = u32;
|
||||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
||||||
@ -138,165 +137,9 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Reader<T>
|
|
||||||
where
|
|
||||||
T: bincode::Decode,
|
|
||||||
{
|
|
||||||
mmap: AsyncMmapFile,
|
|
||||||
count: usize,
|
|
||||||
first_pos: LSize,
|
|
||||||
_t: Option<Arc<T>>, // PhantomData replacement
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Reader<T>
|
|
||||||
where
|
|
||||||
T: bincode::Decode,
|
|
||||||
{
|
|
||||||
pub async fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> {
|
|
||||||
let mmap = AsyncOptions::new()
|
|
||||||
.read(true)
|
|
||||||
.open_mmap_file(path)
|
|
||||||
.await
|
|
||||||
.str_err()?;
|
|
||||||
mmap.try_lock_shared().str_err()?;
|
|
||||||
|
|
||||||
// read first pos and records count
|
|
||||||
let first_data: [u8; LEN_SIZE] = mmap.bytes(0, LEN_SIZE).str_err()?.try_into().str_err()?;
|
|
||||||
let first_pos = LSize::from_le_bytes(first_data);
|
|
||||||
let tab_len = (first_pos as usize) / LEN_SIZE;
|
|
||||||
let count = tab_len - 1;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
mmap,
|
|
||||||
count,
|
|
||||||
first_pos,
|
|
||||||
_t: None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
|
||||||
self.count
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
|
||||||
0 == self.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get(&self, index: usize) -> Result<T, String> {
|
|
||||||
if index >= self.len() {
|
|
||||||
return Err("index out of range".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let next_pos: usize = (index + 1) * LEN_SIZE;
|
|
||||||
|
|
||||||
// read item data pos
|
|
||||||
let data_pos = if 0 == index {
|
|
||||||
self.first_pos
|
|
||||||
} else {
|
|
||||||
let tab_pos: usize = index * LEN_SIZE;
|
|
||||||
let pos_curr_data: [u8; LEN_SIZE] = self
|
|
||||||
.mmap
|
|
||||||
.bytes(tab_pos, LEN_SIZE)
|
|
||||||
.str_err()?
|
|
||||||
.try_into()
|
|
||||||
.str_err()?;
|
|
||||||
LSize::from_le_bytes(pos_curr_data)
|
|
||||||
} as usize;
|
|
||||||
|
|
||||||
// read next item pos
|
|
||||||
let pos_next_data: [u8; LEN_SIZE] = self
|
|
||||||
.mmap
|
|
||||||
.bytes(next_pos, LEN_SIZE)
|
|
||||||
.str_err()?
|
|
||||||
.try_into()
|
|
||||||
.str_err()?;
|
|
||||||
let data_pos_next = LSize::from_le_bytes(pos_next_data) as usize;
|
|
||||||
let data_len = data_pos_next - data_pos;
|
|
||||||
|
|
||||||
// read & unpack item data
|
|
||||||
let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?);
|
|
||||||
let mut data = Vec::<u8>::new();
|
|
||||||
decoder.read_to_end(&mut data).await.str_err()?;
|
|
||||||
|
|
||||||
// decode item
|
|
||||||
let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?;
|
|
||||||
|
|
||||||
Ok(item.0)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stream(&self) -> ReaderStream<'_, T> {
|
|
||||||
ReaderStream::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ReaderStream<'a, T>
|
|
||||||
where
|
|
||||||
T: bincode::Decode,
|
|
||||||
{
|
|
||||||
reader: &'a Reader<T>,
|
|
||||||
index: Option<usize>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, T> ReaderStream<'a, T>
|
|
||||||
where
|
|
||||||
T: bincode::Decode,
|
|
||||||
{
|
|
||||||
fn new(reader: &'a Reader<T>) -> Self {
|
|
||||||
ReaderStream {
|
|
||||||
reader,
|
|
||||||
index: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, T> Stream for ReaderStream<'a, T>
|
|
||||||
where
|
|
||||||
T: bincode::Decode,
|
|
||||||
{
|
|
||||||
type Item = T;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
|
||||||
if self.index.is_none() && !self.reader.is_empty() {
|
|
||||||
self.index = Some(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.index.unwrap() == self.reader.len() {
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
let future = self.reader.get(self.index.unwrap());
|
|
||||||
pin_mut!(future);
|
|
||||||
match Pin::new(&mut future).poll(cx) {
|
|
||||||
Poll::Ready(Ok(item)) => {
|
|
||||||
self.index = Some(self.index.unwrap() + 1);
|
|
||||||
Poll::Ready(Some(item))
|
|
||||||
}
|
|
||||||
Poll::Ready(Err(_)) => Poll::Ready(None),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
let len = self.reader.len();
|
|
||||||
if self.index.is_none() {
|
|
||||||
return (len, Some(len));
|
|
||||||
}
|
|
||||||
|
|
||||||
let index = self.index.unwrap();
|
|
||||||
let rem = if len > index + 1 {
|
|
||||||
len - (index + 1)
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
(rem, Some(rem))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use async_stream::stream;
|
|
||||||
use core::fmt::Debug;
|
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
@ -342,103 +185,4 @@ mod test {
|
|||||||
writer.load(src).await.expect("load");
|
writer.load(src).await.expect("load");
|
||||||
writer.finish().await.expect("finish write");
|
writer.finish().await.expect("finish write");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_write_read() {
|
|
||||||
let dir = tempdir().expect("tempdir");
|
|
||||||
let tmpfile = dir.path().join("test.tmp");
|
|
||||||
let opts = WriterOpts {
|
|
||||||
compress_lvl: Level::Default,
|
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
|
||||||
};
|
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
|
||||||
|
|
||||||
let items_iter = gen_data(5);
|
|
||||||
let items: Vec<TestData> = items_iter.collect();
|
|
||||||
|
|
||||||
let src = stream_iter(items.clone().into_iter());
|
|
||||||
pin_mut!(src);
|
|
||||||
writer.load(src).await.expect("load");
|
|
||||||
writer.finish().await.expect("finish write");
|
|
||||||
|
|
||||||
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
|
|
||||||
assert_eq!(items.len(), reader.len());
|
|
||||||
|
|
||||||
for (idx, item) in items.iter().enumerate() {
|
|
||||||
let ritem = reader.get(idx).await.expect("get");
|
|
||||||
assert_eq!(*item, ritem);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_write_read_stream() {
|
|
||||||
let dir = tempdir().expect("tempdir");
|
|
||||||
let tmpfile = dir.path().join("test.tmp");
|
|
||||||
let opts = WriterOpts {
|
|
||||||
compress_lvl: Level::Default,
|
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
|
||||||
};
|
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
|
||||||
|
|
||||||
let items_iter = gen_data(5);
|
|
||||||
let items: Vec<TestData> = items_iter.collect();
|
|
||||||
|
|
||||||
let src = stream_iter(items.clone().into_iter());
|
|
||||||
pin_mut!(src);
|
|
||||||
writer.load(src).await.expect("load");
|
|
||||||
writer.finish().await.expect("finish write");
|
|
||||||
|
|
||||||
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
|
|
||||||
assert_eq!(items.len(), reader.len());
|
|
||||||
|
|
||||||
let dst_stream = reader.stream();
|
|
||||||
let src_stream = stream_iter(items.iter());
|
|
||||||
|
|
||||||
async fn test_values((x, y): (&TestData, TestData)) {
|
|
||||||
assert_eq!(*x, y);
|
|
||||||
}
|
|
||||||
|
|
||||||
src_stream.zip(dst_stream).for_each(test_values).await;
|
|
||||||
}
|
|
||||||
/// sharing Reader instance between threads
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_share_reader() {
|
|
||||||
let dir = tempdir().expect("tempdir");
|
|
||||||
let tmpfile = dir.path().join("test.tmp");
|
|
||||||
let opts = WriterOpts {
|
|
||||||
compress_lvl: Level::Default,
|
|
||||||
data_buf_size: 10 * 1024 * 1024,
|
|
||||||
out_buf_size: 10 * 1024 * 1024,
|
|
||||||
};
|
|
||||||
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).await.expect("new writer");
|
|
||||||
|
|
||||||
let items_iter = gen_data(5);
|
|
||||||
let items: Vec<TestData> = items_iter.collect();
|
|
||||||
|
|
||||||
let src = stream_iter(items.clone().into_iter());
|
|
||||||
pin_mut!(src);
|
|
||||||
writer.load(src).await.expect("load");
|
|
||||||
writer.finish().await.expect("finish write");
|
|
||||||
|
|
||||||
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).await.expect("new reader");
|
|
||||||
assert_eq!(items.len(), reader.len());
|
|
||||||
|
|
||||||
async fn test_values((x, y): (&TestData, TestData)) {
|
|
||||||
assert_eq!(*x, y);
|
|
||||||
}
|
|
||||||
|
|
||||||
let reader = Arc::new(reader);
|
|
||||||
for _ in 0..=3 {
|
|
||||||
let cur_items = items.clone();
|
|
||||||
let cur_reader = Arc::clone(&reader);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let dst_stream = cur_reader.stream();
|
|
||||||
let src_stream = stream_iter(cur_items.iter());
|
|
||||||
|
|
||||||
src_stream.zip(dst_stream).for_each(test_values).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,14 @@
|
|||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
macro_rules! make {
|
||||||
|
($Target:ident; by {$($field:ident),+}; from $src:expr) => {$Target {$(
|
||||||
|
$field: $src.$field
|
||||||
|
),+}};
|
||||||
|
($Target:ident; with defaults and by {$($field:ident),+}; from $src:expr) => {$Target {$(
|
||||||
|
$field: $src.$field
|
||||||
|
),+ ,..$Target::default()}}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode)]
|
#[derive(Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode)]
|
||||||
pub struct BatchInfo {
|
pub struct BatchInfo {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
@ -71,17 +80,9 @@ pub struct Question {
|
|||||||
|
|
||||||
#[cfg(feature = "source")]
|
#[cfg(feature = "source")]
|
||||||
pub mod convert {
|
pub mod convert {
|
||||||
use super::{BatchInfo, Question};
|
|
||||||
use crate::source::{SourceQuestion, SourceQuestionsBatch};
|
use crate::source::{SourceQuestion, SourceQuestionsBatch};
|
||||||
|
|
||||||
macro_rules! make {
|
use super::{BatchInfo, Question};
|
||||||
($Target:ident; by {$($field:ident),+}; from $src:expr) => {$Target {$(
|
|
||||||
$field: $src.$field
|
|
||||||
),+}};
|
|
||||||
($Target:ident; with defaults and by {$($field:ident),+}; from $src:expr) => {$Target {$(
|
|
||||||
$field: $src.$field
|
|
||||||
),+ ,..$Target::default()}}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<SourceQuestion> for Question {
|
impl From<SourceQuestion> for Question {
|
||||||
fn from(src: SourceQuestion) -> Self {
|
fn from(src: SourceQuestion) -> Self {
|
||||||
|
Loading…
Reference in New Issue
Block a user