+ async source

(reader and converter)
This commit is contained in:
Dmitry Belyaev 2023-08-06 17:01:22 +03:00
parent f55b6f681c
commit ecfc34e821
6 changed files with 475 additions and 113 deletions

85
Cargo.lock generated
View File

@ -92,6 +92,20 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "async-compression"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6"
dependencies = [
"futures-core",
"futures-io",
"memchr",
"pin-project-lite",
"zstd 0.12.4",
"zstd-safe 6.0.6",
]
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.4.1" version = "0.4.1"
@ -138,6 +152,20 @@ dependencies = [
"syn 2.0.26", "syn 2.0.26",
] ]
[[package]]
name = "async_zip"
version = "0.0.15"
source = "git+https://github.com/Majored/rs-async-zip?rev=ff0d985#ff0d985ef54cf00d73c497dbca0beea7541e37dc"
dependencies = [
"async-compression 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crc32fast",
"futures-util",
"pin-project",
"thiserror",
"tokio",
"tokio-util",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -296,8 +324,9 @@ dependencies = [
name = "chgk_ledb_lib" name = "chgk_ledb_lib"
version = "1.1.0" version = "1.1.0"
dependencies = [ dependencies = [
"async-compression", "async-compression 0.4.1 (git+https://github.com/Nullus157/async-compression?rev=4fd4c42)",
"async-stream", "async-stream",
"async_zip",
"bincode", "bincode",
"fmmap", "fmmap",
"futures", "futures",
@ -1061,6 +1090,26 @@ dependencies = [
"sha2", "sha2",
] ]
[[package]]
name = "pin-project"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.10" version = "0.2.10"
@ -1434,6 +1483,26 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.23" version = "0.3.23"
@ -1484,6 +1553,20 @@ dependencies = [
"syn 2.0.26", "syn 2.0.26",
] ]
[[package]]
name = "tokio-util"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.16.0" version = "1.16.0"

View File

@ -14,7 +14,7 @@ name = "db_bench"
harness = false harness = false
[dependencies] [dependencies]
chgk_ledb_lib = {path = "../lib", features = ["sync", "source"]} chgk_ledb_lib = {path = "../lib", features = ["sync", "source", "convert"]}
serde_json="1.0" serde_json="1.0"
zip="0.6" zip="0.6"
rand="0.8" rand="0.8"

View File

@ -10,10 +10,28 @@ description = "Библиотека для доступа к файлу базы
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["async"] default = []
sync = ["zstd", "memmap"] sync = ["zstd", "memmap"]
async = ["futures", "futures-core", "futures-util", "fmmap", "tokio", "async-compression", "async-stream"] async = [
"futures",
"futures-core",
"futures-util",
"fmmap",
"tokio",
"async-compression",
"async-stream",
]
source = ["zip"] source = ["zip"]
source_async = [
"async_zip",
"tokio",
"futures",
"futures-core",
"futures-util",
"async-stream",
]
convert = []
convert_async = ["futures", "futures-core", "futures-util", "async-stream"]
[dependencies] [dependencies]
serde = "1.0" serde = "1.0"
@ -21,12 +39,25 @@ serde_derive="1.0"
serde_json = "1.0" serde_json = "1.0"
bincode = "^2.0.0-rc.2" bincode = "^2.0.0-rc.2"
zip = { version = "0.6", optional = true } zip = { version = "0.6", optional = true }
async_zip = { git = "https://github.com/Majored/rs-async-zip", rev = "ff0d985", features = [
"zstd",
"tokio",
"tokio-fs",
], optional = true }
fmmap = { version = "0.3", features = ["tokio-async"], optional = true } fmmap = { version = "0.3", features = ["tokio-async"], optional = true }
tokio = { version = "1", features = ["fs","io-util","rt", "macros"] , optional = true} tokio = { version = "1", features = [
"fs",
"io-util",
"rt",
"macros",
], optional = true }
futures-core = { version = "0.3", optional = true } futures-core = { version = "0.3", optional = true }
futures = { version = "0.3", optional = true } futures = { version = "0.3", optional = true }
futures-util = { version = "0.3", optional = true } futures-util = { version = "0.3", optional = true }
async-compression = {git="https://github.com/Nullus157/async-compression", rev="4fd4c42", default-features = false, features = ["zstdmt", "tokio"], optional = true} async-compression = { git = "https://github.com/Nullus157/async-compression", rev = "4fd4c42", default-features = false, features = [
"zstd",
"tokio",
], optional = true }
async-stream = { version = "0.3", optional = true } async-stream = { version = "0.3", optional = true }
zstd = { version = "^0.12", default-features = false, optional = true } zstd = { version = "^0.12", default-features = false, optional = true }
memmap = { version = "0.7.0", optional = true } memmap = { version = "0.7.0", optional = true }

View File

@ -1,7 +1,13 @@
#[cfg(all(feature = "convert", feature = "convert_async"))]
compile_error!(
"feature \"convert\" and feature \"convert_async\" cannot be enabled at the same time"
);
#[cfg(feature = "async")] #[cfg(feature = "async")]
pub mod async_db; pub mod async_db;
#[cfg(feature = "sync")] #[cfg(feature = "sync")]
pub mod db; pub mod db;
pub mod questions; pub mod questions;
#[cfg(feature = "source")] #[cfg(any(feature = "source", feature = "source_async"))]
pub mod source; pub mod source;
pub mod util;

View File

@ -69,7 +69,7 @@ pub struct Question {
pub batch_info: BatchInfo, pub batch_info: BatchInfo,
} }
#[cfg(feature = "source")] #[cfg(feature = "convert")]
pub mod convert { pub mod convert {
use super::{BatchInfo, Question}; use super::{BatchInfo, Question};
use crate::source::{SourceQuestion, SourceQuestionsBatch}; use crate::source::{SourceQuestion, SourceQuestionsBatch};
@ -138,6 +138,123 @@ pub mod convert {
} }
} }
} }
#[cfg(feature = "convert")]
#[cfg(feature = "source")]
pub use convert::QuestionsConverter; pub use convert::QuestionsConverter;
#[cfg(feature = "convert_async")]
pub mod convert_async {
use async_stream::stream;
use futures_core::stream::Stream;
use futures_core::Future;
use futures_util::{pin_mut, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use super::{BatchInfo, Question};
use crate::source::{SourceQuestion, SourceQuestionsBatch};
macro_rules! make {
($Target:ident; by {$($field:ident),+}; from $src:expr) => {$Target {$(
$field: $src.$field
),+}};
($Target:ident; with defaults and by {$($field:ident),+}; from $src:expr) => {$Target {$(
$field: $src.$field
),+ ,..$Target::default()}}
}
impl From<SourceQuestion> for Question {
fn from(src: SourceQuestion) -> Self {
make! {Self; with defaults and by {
num, id, description, answer, author, comment, comment1, tour, url,
date, processed_by, redacted_by, copyright, theme, kind, source, rating
}; from src}
}
}
impl From<SourceQuestionsBatch> for BatchInfo {
fn from(src: SourceQuestionsBatch) -> Self {
make! {Self; by {
filename, description, author, comment, url, date,
processed_by, redacted_by, copyright, theme, kind, source, rating
}; from src}
}
}
impl From<SourceQuestionsBatch> for Vec<Question> {
fn from(src: SourceQuestionsBatch) -> Self {
let mut result: Vec<Question> = src
.questions
.iter()
.map(|item| item.clone().into())
.collect();
let batch_info = BatchInfo::from(src);
result.iter_mut().for_each(|question| {
question.batch_info = batch_info.clone();
});
result
}
}
pub struct QuestionsConverterAsync<T>
where
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
+ std::marker::Unpin,
{
inner: T,
}
impl<T> From<T> for QuestionsConverterAsync<T>
where
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
+ std::marker::Unpin,
{
fn from(inner: T) -> Self {
Self { inner }
}
}
impl<T> QuestionsConverterAsync<T>
where
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
+ std::marker::Unpin,
{
fn convert(&mut self) -> impl Stream<Item = Question> + '_ {
stream! {
while let Some((filename, Ok(batch))) = self.inner.next().await {
let mut batch = batch;
batch.filename = filename;
let questions: Vec<Question> = batch.into();
for question in questions {
yield question
}
}
}
}
}
impl<T> Stream for QuestionsConverterAsync<T>
where
T: Stream<Item = (String, Result<SourceQuestionsBatch, serde_json::Error>)>
+ std::marker::Unpin,
{
type Item = Question;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let convert = self.convert();
pin_mut!(convert);
match Pin::new(&mut convert.next()).poll(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
}
#[cfg(feature = "convert_async")]
pub use convert_async::QuestionsConverterAsync;

View File

@ -1,6 +1,4 @@
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use std::io::{Read, Seek};
use zip::ZipArchive;
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct SourceQuestion { pub struct SourceQuestion {
@ -105,6 +103,13 @@ pub struct SourceQuestionsBatch {
pub questions: Vec<SourceQuestion>, pub questions: Vec<SourceQuestion>,
} }
#[cfg(feature = "source")]
pub mod reader_sync {
use std::io::{Read, Seek};
use zip::ZipArchive;
use super::SourceQuestionsBatch;
pub struct SourceQuestionsZipReader<R> pub struct SourceQuestionsZipReader<R>
where where
R: Read + Seek, R: Read + Seek,
@ -204,3 +209,123 @@ where
SourceQuestionsZipReader::new(self) SourceQuestionsZipReader::new(self)
} }
} }
}
#[cfg(feature = "source")]
pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader};
#[cfg(feature = "source_async")]
pub mod reader_async {
use crate::util::ErrorToString;
use async_zip::tokio::read::seek::ZipFileReader;
use futures_core::stream::Stream;
use futures_core::Future;
use futures_util::{pin_mut, AsyncReadExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncSeek};
use super::SourceQuestionsBatch;
pub struct SourceQuestionsZipReaderAsync<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
zipfile: ZipFileReader<R>,
index: Option<usize>,
}
impl<R> SourceQuestionsZipReaderAsync<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
fn new(zipfile: ZipFileReader<R>) -> Self {
SourceQuestionsZipReaderAsync {
zipfile,
index: None,
}
}
async fn parse_zip_entry(
&mut self,
) -> Result<(String, Result<SourceQuestionsBatch, serde_json::Error>), String>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut reader = self
.zipfile
.reader_with_entry(self.index.unwrap())
.await
.str_err()?;
let filename = reader.entry().filename().clone().into_string().str_err()?;
let mut data: Vec<u8> = Vec::new();
reader.read_to_end(&mut data).await.str_err()?;
let parsed: Result<SourceQuestionsBatch, _> = serde_json::from_slice(&data);
self.index = Some(self.index.unwrap() + 1);
Ok((filename, parsed))
}
}
impl<R> Stream for SourceQuestionsZipReaderAsync<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
type Item = (String, Result<SourceQuestionsBatch, serde_json::Error>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let len = self.zipfile.file().entries().len();
if self.index.is_none() && len > 0 {
self.index = Some(0);
}
let index = &mut self.index;
if index.unwrap() == len {
return Poll::Ready(None);
}
let future = self.parse_zip_entry();
pin_mut!(future);
match Pin::new(&mut future).poll(cx) {
Poll::Ready(Ok(item)) => Poll::Ready(Some(item)),
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.zipfile.file().entries().len();
if self.index.is_none() {
return (len, Some(len));
}
let index = self.index.unwrap();
let rem = if len > index + 1 {
len - (index + 1)
} else {
0
};
(rem, Some(rem))
}
}
pub trait ReadSourceQuestionsBatchesAsync<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
fn source_questions(self) -> SourceQuestionsZipReaderAsync<R>;
}
impl<R> ReadSourceQuestionsBatchesAsync<R> for ZipFileReader<R>
where
R: AsyncRead + AsyncSeek + Unpin,
{
fn source_questions(self) -> SourceQuestionsZipReaderAsync<R> {
SourceQuestionsZipReaderAsync::new(self)
}
}
}
#[cfg(feature = "source_async")]
pub use reader_async::{ReadSourceQuestionsBatchesAsync, SourceQuestionsZipReaderAsync};