diff --git a/.drone.yml b/.drone.yml index 26c1ef8..0d06021 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,4 +1,5 @@ kind: pipeline +type: docker name: default steps: @@ -6,8 +7,8 @@ steps: image: rust:1-alpine commands: - apk add --no-cache musl-dev - - cargo build --verbose --all - - cargo test --verbose --all + - cargo build --verbose --all-features --all + - cargo test --verbose --all-features --all environment: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse @@ -17,6 +18,7 @@ trigger: --- kind: pipeline +type: docker name: publish steps: diff --git a/.gitignore b/.gitignore index ebe5bca..19af3b6 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ json.zip /.vscode test*.bin db.dat +*.pending-snap \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a8d3d1c..f7d2cf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -10,14 +19,13 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aes" -version = "0.7.5" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" dependencies = [ "cfg-if", "cipher", "cpufeatures", - "opaque-debug", ] [[package]] @@ -71,7 +79,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -81,7 +89,81 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-compression" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" +dependencies = [ + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "zstd 0.12.4", + "zstd-safe 6.0.6", +] + +[[package]] +name = "async-compression" +version = "0.4.1" +source = "git+https://github.com/Nullus157/async-compression?rev=4fd4c42#4fd4c428ded304c019c0ebcf40149af87b038ab8" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd 0.12.4", + "zstd-safe 6.0.6", +] + +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "async-trait" +version = "0.1.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "async_zip" +version = "0.0.15" +source = "git+https://github.com/Majored/rs-async-zip?rev=ff0d985#ff0d985ef54cf00d73c497dbca0beea7541e37dc" +dependencies = [ + "async-compression 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crc32fast", + "futures-util", + "pin-project", + "thiserror", + "tokio", + "tokio-util", ] [[package]] @@ -101,6 +183,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64ct" version = "1.0.1" @@ -159,6 +256,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "bzip2" version = "0.4.4" @@ -218,17 +321,45 @@ dependencies = [ ] [[package]] -name = "chgk_ledb_lib" +name = "chgk_ledb_async" version = "1.1.0" dependencies = [ + "async_zip", "bincode", - "memmap", + "chgk_ledb_lib", + "clap 4.3.16", + "futures", + "rand", "serde", "serde_derive", "serde_json", "tempfile", + "tokio", + "tokio-stream", +] + +[[package]] +name = "chgk_ledb_lib" +version = "1.1.0" +dependencies = [ + "async-compression 0.4.1 (git+https://github.com/Nullus157/async-compression?rev=4fd4c42)", + "async-stream", + "async_zip", + "bincode", + "fmmap", + "futures", + "futures-core", + "futures-util", + "insta", + "memmap", + "pin-project", + "serde", + "serde_derive", + "serde_json", + "tempfile", + "tokio", "zip", - "zstd", + "zstd 0.12.4", ] [[package]] @@ -260,11 +391,12 @@ dependencies = [ [[package]] name = "cipher" -version = "0.3.0" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ - "generic-array", + "crypto-common", + "inout", ] [[package]] @@ -311,7 +443,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.26", ] [[package]] @@ -335,6 +467,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "console" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c926e00cc70edefdc64d3a5ff31cc65bb97a3460097762bd23afb4d8145fccf8" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "windows-sys 0.45.0", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -465,6 +609,35 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + +[[package]] +name = "enum_dispatch" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f33313078bb8d4d05a2733a94ac4c2d8a0df9a2b84424ebf4f33bfc224a890e" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + [[package]] name = "errno" version = "0.3.1" @@ -473,7 +646,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -505,6 +678,124 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fmmap" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de47572bd3bf7eb669581e4303b922dde85432800a3228f6824f6a04ee455ab" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "enum_dispatch", + "fs4", + "memmapix", + "parse-display", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "fs4" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eeb4ed9e12f43b7fa0baae3f9cdda28352770132ef2e09a23760c29cae8bd47" +dependencies = [ + "async-trait", + "rustix 0.38.4", + "tokio", + "windows-sys 0.48.0", +] + +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -526,6 +817,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "half" version = "1.8.2" @@ -578,6 +875,29 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + +[[package]] +name = "insta" +version = "1.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0770b0a3d4c70567f0d58331f3088b0e4c4f56c9b8d764efe654b4a5d46de3a" +dependencies = [ + "console", + "lazy_static", + "linked-hash-map", + "serde", + "similar", + "yaml-rust", +] + [[package]] name = "instant" version = "0.1.12" @@ -587,6 +907,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-lifetimes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ce5ef949d49ee85593fc4d3f3f95ad61657076395cbbce23e2121fc5542074" + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -595,7 +921,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ "hermit-abi 0.3.2", "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -606,7 +932,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.2", "rustix 0.38.4", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -654,6 +980,18 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + +[[package]] +name = "linux-raw-sys" +version = "0.0.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -688,6 +1026,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "memmapix" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa20aa603283688ba48d9ee8c225b900aebf0d0a871630ed5239e7bf606e5fd1" +dependencies = [ + "rustix 0.35.14", +] + [[package]] name = "memoffset" version = "0.9.0" @@ -725,6 +1072,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -737,12 +1093,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "os_str_bytes" version = "6.5.1" @@ -750,10 +1100,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" [[package]] -name = "password-hash" -version = "0.3.2" +name = "parse-display" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d791538a6dcc1e7cb7fe6f6b58aca40e7f79403c45b2bc274008b5e647af1d8" +checksum = "3b25af4ef94a8528b41fb49a696e361dc6ef975c782417268072d987ac327964" +dependencies = [ + "once_cell", + "parse-display-derive", + "regex", +] + +[[package]] +name = "parse-display-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f106cced1f4b645e3fca6125105cdf7407e35d1af710f290aac530f6b826b9" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.6.29", + "structmeta", + "syn 1.0.109", +] + +[[package]] +name = "password-hash" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" dependencies = [ "base64ct", "rand_core", @@ -762,9 +1138,9 @@ dependencies = [ [[package]] name = "pbkdf2" -version = "0.10.1" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271779f35b581956db91a3e55737327a03aa051e90b1c47aeb189508533adfd7" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" dependencies = [ "digest", "hmac", @@ -772,6 +1148,38 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.27" @@ -900,7 +1308,7 @@ dependencies = [ "aho-corasick", "memchr", "regex-automata", - "regex-syntax", + "regex-syntax 0.7.4", ] [[package]] @@ -911,15 +1319,41 @@ checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.4", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "rustix" +version = "0.35.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6380889b07a03b5ecf1d44dc9ede6fd2145d84b502a2a9ca0b03c48e0cc3220f" +dependencies = [ + "bitflags 1.3.2", + "errno 0.2.8", + "io-lifetimes 0.7.5", + "libc", + "linux-raw-sys 0.0.46", + "windows-sys 0.42.0", +] + [[package]] name = "rustix" version = "0.37.23" @@ -927,11 +1361,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" dependencies = [ "bitflags 1.3.2", - "errno", - "io-lifetimes", + "errno 0.3.1", + "io-lifetimes 1.0.11", "libc", "linux-raw-sys 0.3.8", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -941,10 +1375,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" dependencies = [ "bitflags 2.3.3", - "errno", + "errno 0.3.1", "libc", "linux-raw-sys 0.4.3", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -985,7 +1419,7 @@ checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.26", ] [[package]] @@ -1021,18 +1455,67 @@ dependencies = [ "digest", ] +[[package]] +name = "similar" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" + +[[package]] +name = "slab" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" +dependencies = [ + "autocfg", +] + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "structmeta" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "104842d6278bf64aa9d2f182ba4bde31e8aec7a131d29b7f444bb9b344a09e2a" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 1.0.109", +] + +[[package]] +name = "structmeta-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24420be405b590e2d746d83b01f09af673270cf80e9b003a5fa7b651c58c7d93" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "subtle" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.26" @@ -1055,7 +1538,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix 0.37.23", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1064,16 +1547,34 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" +[[package]] +name = "thiserror" +version = "1.0.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + [[package]] name = "time" version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" dependencies = [ - "itoa", "serde", "time-core", - "time-macros", ] [[package]] @@ -1082,15 +1583,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" -[[package]] -name = "time-macros" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" -dependencies = [ - "time-core", -] - [[package]] name = "tinytemplate" version = "1.2.1" @@ -1101,6 +1593,56 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tokio" +version = "1.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +dependencies = [ + "autocfg", + "backtrace", + "bytes", + "num_cpus", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.16.0" @@ -1168,7 +1710,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.26", "wasm-bindgen-shared", ] @@ -1190,7 +1732,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.26", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1242,13 +1784,52 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.1", +] + +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -1257,51 +1838,93 @@ version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.0" @@ -1309,10 +1932,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] -name = "zip" -version = "0.6.2" +name = "yaml-rust" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf225bcf73bb52cbb496e70475c7bd7a3f769df699c0020f6c7bd9a96dcf0b8d" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + +[[package]] +name = "zip" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" dependencies = [ "aes", "byteorder", @@ -1325,23 +1957,42 @@ dependencies = [ "pbkdf2", "sha1", "time", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] name = "zstd" -version = "0.10.2+zstd.1.5.2" +version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4a6bd64f22b5e3e94b4e238669ff9f10815c27a5180108b849d24174a83847" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", ] [[package]] name = "zstd-safe" -version = "4.1.6+zstd.1.5.2" +version = "5.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b61c51bb270702d6167b8ce67340d2754b088d0c091b06e593aa772c3ee9bb" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" dependencies = [ "libc", "zstd-sys", @@ -1349,10 +2000,11 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.3+zstd.1.5.2" +version = "2.0.8+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" dependencies = [ "cc", "libc", + "pkg-config", ] diff --git a/Cargo.toml b/Cargo.toml index 71609d1..c1f38e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,8 @@ [workspace] +resolver = "2" members = [ "app", + "app_async", "lib" ] diff --git a/app/Cargo.toml b/app/Cargo.toml index a351ade..ef32ab3 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -14,7 +14,7 @@ name = "db_bench" harness = false [dependencies] -chgk_ledb_lib = {path = "../lib"} +chgk_ledb_lib = {path = "../lib", features = ["sync", "source", "convert"]} serde_json="1.0" zip="0.6" rand="0.8" diff --git a/app_async/Cargo.toml b/app_async/Cargo.toml new file mode 100644 index 0000000..9efcccb --- /dev/null +++ b/app_async/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "chgk_ledb_async" +version = "1.1.0" +authors = ["Dmitry "] +edition = "2021" +repository = "https://gitea.b4tman.ru/b4tman/chgk_ledb" +license = "MIT" +description = "Утилита загружающая базу данных ЧГК вопросов из ZIP файла в JSON формате в базу данных." + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chgk_ledb_lib = {path = "../lib", features = ["async", "convert_async"]} +serde_json="1.0" +async_zip = { git = "https://github.com/Majored/rs-async-zip", rev = "ff0d985", features = [ + "zstd", + "tokio", + "tokio-fs"] } +tokio = { version = "1", features = [ + "io-util", + "fs", + "rt-multi-thread" +] } +tokio-stream = "0.1" +rand="0.8" +clap = { version = "4.2.7", features = ["derive"] } +futures = "0.3" + +[dev-dependencies] +tempfile = "3.3" +bincode = "^2.0.0-rc.2" +serde="1.0" +serde_derive="1.0" diff --git a/app_async/src/main.rs b/app_async/src/main.rs new file mode 100644 index 0000000..88e1256 --- /dev/null +++ b/app_async/src/main.rs @@ -0,0 +1,192 @@ +extern crate serde_json; +use clap::{Parser, Subcommand}; +use futures::{pin_mut, Future}; +use rand::distributions::Uniform; +use rand::seq::IteratorRandom; +use rand::{thread_rng, Rng}; + +use async_zip::tokio::read::seek::ZipFileReader; +use futures::stream::{self, StreamExt}; +use std::time::Instant; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +use async_db::WriterOpts; + +use tokio::{fs, io}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use chgk_ledb_lib::async_db; +use chgk_ledb_lib::questions::Question; +use chgk_ledb_lib::questions::QuestionsConverterAsyncForStream; +use chgk_ledb_lib::source::ReadSourceQuestionsBatchesAsync; + +const ZIP_FILENAME: &str = "json.zip"; +const NEW_DB_FILENAME: &str = "db.dat"; + +#[derive(Subcommand, Debug)] +enum Command { + Write, + Print { + #[clap(value_parser, default_value = "0")] + id: u32, + }, + ZipPrint { + #[clap(value_parser, default_value = "0")] + file_num: usize, + #[clap(value_parser, default_value = "0")] + num: usize, + }, +} + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +#[clap(propagate_version = true)] +struct Cli { + #[clap(subcommand)] + command: Command, + #[clap(short, long, action)] + measure: bool, +} + +async fn zip_reader_task(tx: UnboundedSender) { + let mut file = fs::File::open(ZIP_FILENAME).await.expect("open zip"); + let archive = ZipFileReader::with_tokio(&mut file) + .await + .expect("open zip file reader"); + let mut source_questions = archive.source_questions(); + let source_questions = source_questions.stream(); + pin_mut!(source_questions); + + source_questions + .converter() + .convert() + .enumerate() + .map(|(num, mut question)| { + question.num = 1 + (num as u32); + question + }) + .for_each_concurrent(None, |question| async { + tx.send(question).expect("send"); + }) + .await; + + println!("read done"); +} + +async fn print_question_from(get_q: F) +where + F: Future>, +{ + let q = get_q.await.expect("question not found"); + println!("{:#?}", q) +} + +async fn read_from_zip(file_num: usize, mut num: usize) -> Option { + let mut rng = thread_rng(); + let zip_file = fs::File::open(ZIP_FILENAME).await.expect("open zip file"); + let mut zip_reader = io::BufReader::new(zip_file); + let archive = ZipFileReader::with_tokio(&mut zip_reader) + .await + .expect("open zip file reader"); + + let mut source = archive.source_questions(); + let files_count = source.len(); + let file_index = if file_num == 0 { + let files = Uniform::new(0, files_count); + rng.sample(files) + } else { + file_num - 1 + }; + + let src = source.get(file_index).await; + let src = stream::once(async { src.expect("get source file") }); + pin_mut!(src); + let converter = src.converter(); + let questions: Vec<_> = converter.convert().collect().await; + if num == 0 { + num = (1..=questions.len()).choose(&mut rng).unwrap(); + } + let mut question = questions.get(num - 1).expect("get question").clone(); + question.num = num as u32; + Some(question) +} + +// measure and return time elapsed in `fut` in seconds +pub async fn measure(fut: F) -> f64 { + let start = Instant::now(); + fut.await; + let elapsed = start.elapsed(); + (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) +} + +pub async fn measure_and_print(fut: F) { + let m = measure(fut).await; + eprintln!("{}", m); +} + +#[tokio::main] +async fn main() { + let args = Cli::parse(); + + let mut action: Box> = match &args.command { + Command::Write => Box::new(write_db()), + Command::Print { id } => { + let get_question = read_from_db(*id); + Box::new(print_question_from(get_question)) + } + Command::ZipPrint { file_num, num } => { + let get_question = read_from_zip(*file_num, *num); + Box::new(print_question_from(get_question)) + } + }; + + if args.measure { + action = Box::new(measure_and_print(Box::into_pin(action))); + } + + Box::into_pin(action).await; +} + +async fn read_from_db(id: u32) -> Option { + let reader: async_db::Reader = async_db::Reader::new(NEW_DB_FILENAME) + .await + .expect("new db reader"); + + let len = reader.len(); + + let index = if len == 0 { + let mut rng = thread_rng(); + let questions = Uniform::new(0, len); + rng.sample(questions) + } else { + id as usize - 1 + }; + + match reader.get(index).await { + Ok(question) => Some(question), + Err(_) => None, + } +} +async fn write_db() { + let (tx, rx) = mpsc::unbounded_channel::(); + tokio::try_join!( + tokio::spawn(zip_reader_task(tx)), + tokio::spawn(db_writer_task(rx)) + ) + .expect("tokio join"); + println!("all done"); +} +async fn db_writer_task(rx: UnboundedReceiver) { + let writer_opts = WriterOpts::default(); + let mut writer: async_db::Writer = + async_db::Writer::new(NEW_DB_FILENAME, writer_opts) + .await + .unwrap_or_else(|e| panic!("db writer load, {e:#?}")); + + let stream: UnboundedReceiverStream<_> = rx.into(); + let stream = stream; + writer.load(stream).await.expect("load"); + writer.finish().await.expect("db writer finish"); + + println!("write done"); +} diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 3172a5a..6753070 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -9,14 +9,68 @@ description = "Библиотека для доступа к файлу базы # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +sync = ["zstd", "memmap"] +async = [ + "futures", + "futures-core", + "futures-util", + "fmmap", + "tokio", + "async-compression", + "async-stream", + "pin-project", +] +source = ["zip"] +source_async = [ + "async_zip", + "tokio", + "futures", + "futures-core", + "futures-util", + "async-stream", +] +convert = ["zip"] +convert_async = [ + "futures", + "futures-core", + "futures-util", + "async-stream", + "async_zip", + "tokio", +] + [dependencies] -serde="1.0" -serde_derive="1.0" -serde_json="1.0" -zip="0.6" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" bincode = "^2.0.0-rc.2" -zstd = "^0.10" -memmap = "0.7.0" +zip = { version = "0.6", optional = true } +async_zip = { git = "https://github.com/Majored/rs-async-zip", rev = "ff0d985", features = [ + "zstd", + "tokio", + "tokio-fs", +], optional = true } +fmmap = { version = "0.3", features = ["tokio-async"], optional = true } +tokio = { version = "1", features = [ + "fs", + "io-util", + "rt", + "macros", +], optional = true } +futures-core = { version = "0.3", optional = true } +futures = { version = "0.3", optional = true } +futures-util = { version = "0.3", optional = true } +async-compression = { git = "https://github.com/Nullus157/async-compression", rev = "4fd4c42", default-features = false, features = [ + "zstd", + "tokio", +], optional = true } +async-stream = { version = "0.3", optional = true } +zstd = { version = "^0.12", default-features = false, optional = true } +memmap = { version = "0.7.0", optional = true } +pin-project = { version = "1.1.3", optional = true } [dev-dependencies] +insta = { version = "1.31.0", features = ["yaml"] } tempfile = "3.3" diff --git a/lib/src/async_db.rs b/lib/src/async_db.rs new file mode 100644 index 0000000..224224b --- /dev/null +++ b/lib/src/async_db.rs @@ -0,0 +1,787 @@ +use std::marker::PhantomData; +use std::ops::Deref; +use std::vec; +use std::{path::Path, sync::Arc}; + +use async_compression::tokio::bufread::ZstdDecoder; +use async_compression::tokio::bufread::ZstdEncoder; +use async_compression::Level; +use futures::sink::Sink; +use futures::stream::StreamExt; +use futures_core::stream::Stream; +use futures_core::Future; +use futures_util::pin_mut; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::{ + fs, + io::{self, AsyncReadExt, AsyncWriteExt}, +}; + +use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt, AsyncOptions}; + +type LSize = u32; +const LEN_SIZE: usize = std::mem::size_of::(); +const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard(); + +use crate::util::BincodeVecWriter; +use crate::util::ErrorToString; + +pub struct WriterOpts { + pub compress_lvl: Level, + pub data_buf_size: usize, + pub out_buf_size: usize, + pub current_buf_size: usize, +} + +impl Default for WriterOpts { + fn default() -> Self { + Self { + compress_lvl: Level::Default, + data_buf_size: 500 * 1024 * 1024, + out_buf_size: 200 * 1024 * 1024, + current_buf_size: 100 * 1024, + } + } +} + +pub struct Writer +where + T: bincode::Encode, +{ + out: io::BufWriter, + data_buf: Vec, + cur_buf_item: BincodeVecWriter, + table: Vec, + compress_lvl: Level, + _t: PhantomData>, +} + +impl Writer +where + T: bincode::Encode, +{ + pub async fn new>(path: P, opts: WriterOpts) -> Result { + let out = fs::File::create(path).await.str_err()?; + let out = io::BufWriter::with_capacity(opts.out_buf_size, out); + let data_buf: Vec = Vec::with_capacity(opts.data_buf_size); + let cur_buf_item: Vec = Vec::with_capacity(opts.current_buf_size); + let cur_buf_item = BincodeVecWriter::new(cur_buf_item); + + let compress_lvl = opts.compress_lvl; + + let table: Vec = vec![]; + + Ok(Self { + out, + data_buf, + cur_buf_item, + table, + compress_lvl, + _t: PhantomData, + }) + } + + pub async fn push(&mut self, item: T) -> Result<(), String> { + self.push_by_ref(&item).await + } + + pub async fn push_by_ref(&mut self, item: &T) -> Result<(), String> { + let pos: LSize = self.data_buf.len() as LSize; + + bincode::encode_into_writer(item, &mut self.cur_buf_item, BINCODE_CFG).str_err()?; + + let mut zencoder = ZstdEncoder::with_quality(&self.cur_buf_item[..], self.compress_lvl); + io::copy(&mut zencoder, &mut self.data_buf) + .await + .str_err()?; + self.cur_buf_item.clear(); + + self.table.push(pos); + + // FIXME + // this will break WriterSink::poll_ready (will wait forever), but not Writer::load + // tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + Ok(()) + } + + pub async fn load(&mut self, source: S) -> Result<(), String> + where + S: Stream + std::marker::Unpin, + { + let hint = source.size_hint(); + let hint = std::cmp::max(hint.0, hint.1.unwrap_or(0)); + if hint > 0 { + self.table.reserve(hint); + } + + pin_mut!(source); + while let Some(item) = source.next().await { + self.push(item).await?; + } + + Ok(()) + } + + pub async fn finish(mut self) -> Result<(), String> { + // finish tab + let pos: LSize = self.data_buf.len() as LSize; + self.table.push(pos); + + // write tab + let tab_size = (self.table.len() * LEN_SIZE) as LSize; + for pos in self.table { + let pos_data = (pos + tab_size).to_le_bytes(); + self.out.write_all(&pos_data).await.str_err()?; + } + // copy data + self.out.write_all(&self.data_buf[..]).await.str_err()?; + + self.out.flush().await.str_err()?; + Ok(()) + } + + pub fn sink(&mut self) -> WriterSink<'_, T> { + WriterSink { + writer: self, + item: None, + } + } +} + +use pin_project::pin_project; + +#[pin_project] +/// FIXME: not really async +/// only work when ..push.poll() returns Ready immediately +pub struct WriterSink<'a, T> +where + T: bincode::Encode, +{ + #[pin] + writer: &'a mut Writer, + item: Option, +} + +impl<'a, T> Sink for WriterSink<'a, T> +where + T: bincode::Encode, +{ + type Error = String; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut this = self.project(); + + if this.item.is_none() { + return Poll::Ready(Ok(())); + } + + let item = this.item.take().unwrap(); + + let push_fut = this.writer.push(item); // FIXME:: how to save this future??? + pin_mut!(push_fut); + push_fut.poll(ctx) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + let this = self.project(); + *this.item = Some(item); + Ok(()) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> Poll> { + self.poll_ready(ctx) + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> Poll> { + futures::ready!(self.as_mut().poll_ready(ctx))?; + Poll::Ready(Ok(())) + } +} + +pub struct Reader +where + T: bincode::Decode, +{ + mmap: AsyncMmapFile, + count: usize, + first_pos: LSize, + _t: PhantomData>, +} + +impl Reader +where + T: bincode::Decode, +{ + pub async fn new>(path: P) -> Result { + let mmap = AsyncOptions::new() + .read(true) + .open_mmap_file(path) + .await + .str_err()?; + mmap.try_lock_shared().str_err()?; + + // read first pos and records count + let first_data: [u8; LEN_SIZE] = mmap.bytes(0, LEN_SIZE).str_err()?.try_into().str_err()?; + let first_pos = LSize::from_le_bytes(first_data); + let tab_len = (first_pos as usize) / LEN_SIZE; + let count = tab_len - 1; + + Ok(Self { + mmap, + count, + first_pos, + _t: PhantomData, + }) + } + + pub fn len(&self) -> usize { + self.count + } + + pub fn is_empty(&self) -> bool { + 0 == self.len() + } + + /// get item at index, reuse data buffer + pub async fn get_with_buf(&self, index: usize, data_buf: &mut Vec) -> Result { + if index >= self.len() { + return Err("index out of range".into()); + } + + let next_pos: usize = (index + 1) * LEN_SIZE; + + // read item data pos + let data_pos = if 0 == index { + self.first_pos + } else { + let tab_pos: usize = index * LEN_SIZE; + let pos_curr_data: [u8; LEN_SIZE] = self + .mmap + .bytes(tab_pos, LEN_SIZE) + .str_err()? + .try_into() + .str_err()?; + LSize::from_le_bytes(pos_curr_data) + } as usize; + + // read next item pos + let pos_next_data: [u8; LEN_SIZE] = self + .mmap + .bytes(next_pos, LEN_SIZE) + .str_err()? + .try_into() + .str_err()?; + let data_pos_next = LSize::from_le_bytes(pos_next_data) as usize; + let data_len = data_pos_next - data_pos; + + // read & unpack item data + let mut decoder = ZstdDecoder::new(self.mmap.range_reader(data_pos, data_len).str_err()?); + decoder.read_to_end(data_buf).await.str_err()?; + + // decode item + let item: (T, usize) = bincode::decode_from_slice(data_buf, BINCODE_CFG).str_err()?; + + data_buf.clear(); + Ok(item.0) + } + + /// get item at index + pub async fn get(&self, index: usize) -> Result { + let mut data_buf: Vec = vec![]; + self.get_with_buf(index, &mut data_buf).await + } + + pub fn stream(&self) -> ReaderStream<'_, T> { + ReaderStream::new(self) + } +} + +pub struct ReaderStream<'a, T> +where + T: bincode::Decode, +{ + reader: &'a Reader, + index: Option, +} + +impl<'a, T> ReaderStream<'a, T> +where + T: bincode::Decode, +{ + fn new(reader: &'a Reader) -> Self { + ReaderStream { + reader, + index: None, + } + } +} + +impl<'a, T> Stream for ReaderStream<'a, T> +where + T: bincode::Decode, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.index.is_none() && !self.reader.is_empty() { + self.index = Some(0); + } + + if self.index.unwrap() == self.reader.len() { + return Poll::Ready(None); + } + + // FIXME: mayby work only if reader.get().poll() return Ready immediately + let future = self.reader.get(self.index.unwrap()); + pin_mut!(future); + match Pin::new(&mut future).poll(cx) { + Poll::Ready(Ok(item)) => { + self.index = Some(self.index.unwrap() + 1); + Poll::Ready(Some(item)) + } + Poll::Ready(Err(_)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.reader.len(); + if self.index.is_none() { + return (len, Some(len)); + } + + let index = self.index.unwrap(); + let rem = if len > index + 1 { + len - (index + 1) + } else { + 0 + }; + (rem, Some(rem)) + } +} + +pub struct BufReader +where + T: bincode::Decode, +{ + inner: Reader, + buf: Vec, +} + +impl BufReader +where + T: bincode::Decode, +{ + pub async fn new>(path: P, buf_size: usize) -> Result { + match Reader::::new(path).await { + Ok(inner) => Ok(Self { + inner, + buf: Vec::with_capacity(buf_size), + }), + Err(e) => Err(e), + } + } + + pub async fn get(&mut self, index: usize) -> Result { + self.inner.get_with_buf(index, &mut self.buf).await + } + + pub fn into_inner(self) -> Reader { + self.inner + } + + pub fn stream(self) -> BufReaderStream { + BufReaderStream::new(self) + } +} + +impl From> for BufReader +where + T: bincode::Decode, +{ + fn from(inner: Reader) -> Self { + Self { + inner, + buf: Vec::new(), + } + } +} + +impl From> for Reader +where + T: bincode::Decode, +{ + fn from(value: BufReader) -> Self { + value.into_inner() + } +} + +impl Deref for BufReader +where + T: bincode::Decode, +{ + type Target = Reader; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +pub struct BufReaderStream +where + T: bincode::Decode, +{ + reader: BufReader, + index: Option, +} + +impl BufReaderStream +where + T: bincode::Decode, +{ + fn new(reader: BufReader) -> Self { + BufReaderStream { + reader, + index: None, + } + } + + async fn get_next(&mut self) -> Result { + match self.index { + None => Err("index is None".into()), + Some(index) => { + let res = self.reader.get(index).await; + self.index = Some(index + 1); + + res + } + } + } +} + +impl Stream for BufReaderStream +where + T: bincode::Decode, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.index.is_none() && !self.reader.is_empty() { + self.index = Some(0); + } + + if self.index.unwrap() == self.reader.len() { + return Poll::Ready(None); + } + + // FIXME: mayby work only if reader.get().poll() return Ready immediately + let future = self.get_next(); + pin_mut!(future); + match Pin::new(&mut future).poll(cx) { + Poll::Ready(Ok(item)) => Poll::Ready(Some(item)), + Poll::Ready(Err(_)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.reader.len(); + if self.index.is_none() { + return (len, Some(len)); + } + + let index = self.index.unwrap(); + let rem = if len > index + 1 { + len - (index + 1) + } else { + 0 + }; + (rem, Some(rem)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use core::fmt::Debug; + use tempfile::tempdir; + + #[derive(bincode::Encode, bincode::Decode, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] + struct TestData { + num: u64, + test: String, + vnum: Vec, + vstr: Vec, + } + + fn gen_data(count: usize) -> impl Iterator { + (0..count).map(|i| TestData { + num: i as u64, + test: "test".repeat(i), + vnum: (0..i * 120).map(|x| (x ^ 0x345FE34) as u64).collect(), + vstr: (0..i * 111).map(|x| "test".repeat(x)).collect(), + }) + } + + async fn assert_data_eq((x, y): (&TestData, TestData)) { + assert_eq!(*x, y); + } + + #[tokio::test] + async fn test_write() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + } + + #[tokio::test] + async fn test_write_read() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let ritem = reader.get(idx).await.expect("get"); + assert_eq!(*item, ritem); + } + } + + #[tokio::test] + async fn test_write_sink_read() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()).map(Ok); + pin_mut!(src); + src.forward(writer.sink()).await.expect("forward"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let ritem = reader.get(idx).await.expect("get"); + assert_eq!(*item, ritem); + } + } + + #[tokio::test] + async fn test_write_read_get_with_buf() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let mut data_buf: Vec = vec![]; + let ritem = reader.get_with_buf(idx, &mut data_buf).await.expect("get"); + assert_eq!(*item, ritem); + } + } + + #[tokio::test] + async fn test_write_read_stream() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + let dst_stream = reader.stream(); + let src_stream = futures::stream::iter(items.iter()); + + let mut count = 0; + src_stream + .zip(dst_stream) + .map(|x| { + count += 1; + x + }) + .for_each(assert_data_eq) + .await; + assert_eq!(count, items.len()) + } + /// sharing Reader instance between threads + #[tokio::test] + async fn test_share_reader() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader: Reader = Reader::new(&tmpfile).await.expect("new reader"); + assert_eq!(items.len(), reader.len()); + + let reader = Arc::new(reader); + for _ in 0..=3 { + let cur_items = items.clone(); + let cur_reader = Arc::clone(&reader); + tokio::spawn(async move { + let dst_stream = cur_reader.stream(); + let src_stream = futures::stream::iter(cur_items.iter()); + + src_stream.zip(dst_stream).for_each(assert_data_eq).await; + }); + } + } + + #[tokio::test] + async fn test_write_bufread() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let mut reader = BufReader::::new(&tmpfile, 4096) + .await + .expect("new buf reader"); + assert_eq!(items.len(), reader.len()); + + for (idx, item) in items.iter().enumerate() { + let ritem = reader.get(idx).await.expect("get"); + assert_eq!(*item, ritem); + } + } + + #[tokio::test] + async fn test_write_bufread_stream() { + let dir = tempdir().expect("tempdir"); + let tmpfile = dir.path().join("test.tmp"); + let opts = WriterOpts { + data_buf_size: 10 * 1024 * 1024, + out_buf_size: 10 * 1024 * 1024, + ..Default::default() + }; + let mut writer: Writer = Writer::new(&tmpfile, opts).await.expect("new writer"); + + let items_iter = gen_data(5); + let items: Vec = items_iter.collect(); + + let src = futures::stream::iter(items.clone()); + pin_mut!(src); + writer.load(src).await.expect("load"); + writer.finish().await.expect("finish write"); + + let reader = BufReader::::new(&tmpfile, 4096) + .await + .expect("new buf reader"); + assert_eq!(items.len(), reader.len()); + + let dst_stream = reader.stream(); + let src_stream = futures::stream::iter(items.iter()); + + let mut count = 0; + src_stream + .zip(dst_stream) + .map(|x| { + count += 1; + x + }) + .for_each(assert_data_eq) + .await; + assert_eq!(count, items.len()) + } +} diff --git a/lib/src/db.rs b/lib/src/db.rs index c3c8ad8..17d94f9 100644 --- a/lib/src/db.rs +++ b/lib/src/db.rs @@ -12,20 +12,8 @@ type LSize = u32; const LEN_SIZE: usize = std::mem::size_of::(); const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard(); -trait ErrorToString { - type Output; - fn str_err(self) -> std::result::Result; -} - -impl ErrorToString for std::result::Result -where - E: std::error::Error, -{ - type Output = T; - fn str_err(self) -> std::result::Result { - self.map_err(|e| e.to_string()) - } -} +use crate::util::BincodeVecWriter; +use crate::util::ErrorToString; pub struct WriterOpts { pub compress_lvl: i32, @@ -52,6 +40,7 @@ where out: io::BufWriter, data_buf: Cursor>, cur_buf_raw: Cursor>, + cur_buf_item: BincodeVecWriter, table: Vec, compress_lvl: i32, _t: PhantomData>, @@ -69,6 +58,8 @@ where let cur_buf_raw: Vec = Vec::with_capacity(opts.current_buf_size); let cur_buf_raw = Cursor::new(cur_buf_raw); + let cur_buf_item: Vec = Vec::with_capacity(opts.current_buf_size); + let cur_buf_item = BincodeVecWriter::new(cur_buf_item); let compress_lvl = opts.compress_lvl; @@ -78,6 +69,7 @@ where out, data_buf, cur_buf_raw, + cur_buf_item, table, compress_lvl, _t: PhantomData, @@ -85,20 +77,25 @@ where } pub fn push(&mut self, item: T) -> Result<(), String> { + self.push_by_ref(&item) + } + + pub fn push_by_ref(&mut self, item: &T) -> Result<(), String> { let pos: LSize = self.data_buf.position() as LSize; - let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?; + bincode::encode_into_writer(item, &mut self.cur_buf_item, BINCODE_CFG).str_err()?; let mut zencoder = zstd::stream::raw::Encoder::new(self.compress_lvl).str_err()?; zencoder - .set_pledged_src_size(item_data.len() as u64) + .set_pledged_src_size(Some(self.cur_buf_item.len() as u64)) .str_err()?; self.cur_buf_raw.set_position(0); let mut cur_buf_z = zstd::stream::zio::Writer::new(&mut self.cur_buf_raw, zencoder); - cur_buf_z.write_all(&item_data).str_err()?; + cur_buf_z.write_all(&self.cur_buf_item).str_err()?; cur_buf_z.finish().str_err()?; cur_buf_z.flush().str_err()?; + self.cur_buf_item.clear(); self.table.push(pos); let (cur_buf_raw, _) = cur_buf_z.into_inner(); @@ -416,7 +413,7 @@ mod test { } fn gen_data(count: usize) -> impl Iterator { - (0..count).into_iter().map(|i| TestData { + (0..count).map(|i| TestData { num: i as u64, test: "test".repeat(i), }) diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b1ff844..3f95dea 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,3 +1,13 @@ +#[cfg(feature = "async")] +pub mod async_db; +#[cfg(feature = "sync")] pub mod db; pub mod questions; +#[cfg(any( + feature = "source", + feature = "source_async", + feature = "convert", + feature = "convert_async" +))] pub mod source; +pub mod util; diff --git a/lib/src/questions.rs b/lib/src/questions.rs index 48b3489..4bfc7b9 100644 --- a/lib/src/questions.rs +++ b/lib/src/questions.rs @@ -1,136 +1,398 @@ use serde_derive::{Deserialize, Serialize}; -use crate::source::{SourceQuestion, SourceQuestionsBatch}; - -macro_rules! make { - ($Target:ident; by {$($field:ident),+}; from $src:expr) => {$Target {$( - $field: $src.$field - ),+}}; - ($Target:ident; with defaults and by {$($field:ident),+}; from $src:expr) => {$Target {$( - $field: $src.$field - ),+ ,..$Target::default()}} -} - -#[derive(Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode)] +#[derive( + Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode, PartialEq, +)] pub struct BatchInfo { - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub filename: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub description: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub url: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub kind: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub rating: String, } -#[derive(Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode)] +#[derive( + Debug, Default, Clone, Serialize, Deserialize, bincode::Decode, bincode::Encode, PartialEq, +)] pub struct Question { - #[serde(default)] + #[serde(default, skip_serializing_if = "u32_is_zero")] pub num: u32, pub id: String, pub description: String, pub answer: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub comment1: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub tour: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub url: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub kind: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub rating: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "BatchInfo::is_default")] pub batch_info: BatchInfo, } -impl From for Question { - fn from(src: SourceQuestion) -> Self { - make! {Self; with defaults and by { - num, id, description, answer, author, comment, comment1, tour, url, - date, processed_by, redacted_by, copyright, theme, kind, source, rating - }; from src} +fn u32_is_zero(num: &u32) -> bool { + *num == 0 +} + +impl BatchInfo { + pub fn is_default(&self) -> bool { + *self == BatchInfo::default() } } -impl From for BatchInfo { - fn from(src: SourceQuestionsBatch) -> Self { - make! {Self; by { - filename, description, author, comment, url, date, - processed_by, redacted_by, copyright, theme, kind, source, rating - }; from src} +#[cfg(any(feature = "convert", feature = "convert_async"))] +pub mod convert_common { + use super::{BatchInfo, Question}; + use crate::source::{SourceQuestion, SourceQuestionsBatch}; + + macro_rules! make { + ($Target:ident; by {$($field:ident),+}; from $src:expr) => {$Target {$( + $field: $src.$field + ),+}}; + ($Target:ident; with defaults and by {$($field:ident),+}; from $src:expr) => {$Target {$( + $field: $src.$field + ),+ ,..$Target::default()}} } -} -impl From for Vec { - fn from(src: SourceQuestionsBatch) -> Self { - let mut result: Vec = src - .questions - .iter() - .map(|item| item.clone().into()) - .collect(); - let batch_info = BatchInfo::from(src); - result.iter_mut().for_each(|question| { - question.batch_info = batch_info.clone(); - }); - - result + impl From for Question { + fn from(src: SourceQuestion) -> Self { + make! {Self; with defaults and by { + num, id, description, answer, author, comment, comment1, tour, url, + date, processed_by, redacted_by, copyright, theme, kind, source, rating + }; from src} + } } -} -pub trait QuestionsConverter { - fn convert<'a>(&'a mut self) -> Box + 'a>; -} + impl From for BatchInfo { + fn from(src: SourceQuestionsBatch) -> Self { + make! {Self; by { + filename, description, author, comment, url, date, + processed_by, redacted_by, copyright, theme, kind, source, rating + }; from src} + } + } -impl QuestionsConverter for T -where - T: Iterator)>, -{ - fn convert<'a>(&'a mut self) -> Box + 'a> { - let iter = self - .filter(|(_, data)| data.is_ok()) - .flat_map(|(filename, data)| { - let mut batch = data.unwrap(); - batch.filename = filename; - let questions: Vec = batch.into(); - questions + impl From for Vec { + fn from(src: SourceQuestionsBatch) -> Self { + let mut src = src; + let mut questions: Vec = vec![]; + std::mem::swap(&mut src.questions, &mut questions); + let mut result: Vec = questions.into_iter().map(|item| item.into()).collect(); + let batch_info = BatchInfo::from(src); + result.iter_mut().for_each(|question| { + question.batch_info = batch_info.clone(); }); - Box::new(iter) + + result + } + } +} + +#[cfg(feature = "convert")] +pub mod convert { + use super::Question; + use crate::source::SourceQuestionsBatch; + + pub trait QuestionsConverter { + fn convert<'a>(&'a mut self) -> Box + 'a>; + } + + impl QuestionsConverter for T + where + T: Iterator)>, + { + fn convert<'a>(&'a mut self) -> Box + 'a> { + let iter = self + .filter(|(_, data)| data.is_ok()) + .flat_map(|(filename, data)| { + let mut batch = data.unwrap(); + batch.filename = filename; + let questions: Vec = batch.into(); + questions + }); + Box::new(iter) + } + } + + #[cfg(test)] + mod test { + use crate::questions::test::convert_common::sample_batch; + + use super::*; + use insta::assert_yaml_snapshot; + use std::iter; + + #[test] + fn test_convert() { + let mut source = iter::once(( + String::from("test.json"), + Ok::(sample_batch()), + )); + let converted: Vec<_> = source.convert().collect(); + assert_yaml_snapshot!(converted, @r#" + --- + - id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + batch_info: + filename: test.json + description: Тестовый + date: 00-000-2000 + - id: Вопрос 2 + description: Зимой и летом одним цветом + answer: ёлка + batch_info: + filename: test.json + description: Тестовый + date: 00-000-2000 + + "#); + } + } +} +#[cfg(feature = "convert")] +pub use convert::QuestionsConverter; + +#[cfg(feature = "convert_async")] +pub mod convert_async { + use futures::stream; + use futures_core::stream::Stream; + use futures_util::StreamExt; + + use super::Question; + use crate::source::SourceQuestionsBatch; + + pub struct QuestionsConverterAsync + where + T: Stream)> + + std::marker::Unpin, + { + inner: T, + } + + impl From for QuestionsConverterAsync + where + T: Stream)> + + std::marker::Unpin, + { + fn from(inner: T) -> Self { + Self { inner } + } + } + + pub trait QuestionsConverterAsyncForStream + where + T: Stream)> + + std::marker::Unpin, + { + fn converter(&mut self) -> QuestionsConverterAsync<&mut T>; + } + + impl QuestionsConverterAsyncForStream for T + where + T: Stream)> + + std::marker::Unpin, + { + fn converter(&mut self) -> QuestionsConverterAsync<&mut T> { + QuestionsConverterAsync::from(self) + } + } + + impl QuestionsConverterAsync + where + T: Stream)> + + std::marker::Unpin, + { + pub fn convert(self) -> impl Stream { + self.inner + .filter_map(|(name, res)| async move { + if let Ok(item) = res { + Some((name, item)) + } else { + None + } + }) + .flat_map(|(filename, batch)| { + stream::iter({ + let mut batch = batch; + batch.filename = filename; + let questions: Vec = batch.into(); + questions + }) + }) + } + } + + #[cfg(test)] + mod test { + use crate::questions::test::convert_common::sample_batch; + + use super::*; + use futures_util::{pin_mut, StreamExt}; + use insta::assert_yaml_snapshot; + + #[tokio::test] + async fn test_convert_stream() { + let source = futures::stream::once(async { + ( + String::from("test.json"), + Ok::(sample_batch()), + ) + }); + + pin_mut!(source); + let converter = source.converter(); + let converter = converter.convert(); + let converted: Vec<_> = converter.collect().await; + assert_yaml_snapshot!(converted, @r#" + --- + - id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + batch_info: + filename: test.json + description: Тестовый + date: 00-000-2000 + - id: Вопрос 2 + description: Зимой и летом одним цветом + answer: ёлка + batch_info: + filename: test.json + description: Тестовый + date: 00-000-2000 + + "#); + } + } +} +#[cfg(feature = "convert_async")] +pub use convert_async::{QuestionsConverterAsync, QuestionsConverterAsyncForStream}; + +#[cfg(test)] +mod test { + use super::*; + use insta::assert_yaml_snapshot; + use serde_json::json; + + #[cfg(any(feature = "convert", feature = "convert_async"))] + pub mod convert_common { + use crate::source::{SourceQuestion, SourceQuestionsBatch}; + + pub fn sample_batch() -> SourceQuestionsBatch { + SourceQuestionsBatch { + description: "Тестовый".into(), + date: "00-000-2000".into(), + questions: vec![ + SourceQuestion { + id: "Вопрос 1".into(), + description: "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2".into(), + answer: "42".into(), + ..Default::default() + }, + SourceQuestion { + id: "Вопрос 2".into(), + description: "Зимой и летом одним цветом".into(), + answer: "ёлка".into(), + ..Default::default() + }, + ], + ..Default::default() + } + } + } + + pub fn sample_question() -> Question { + Question { + id: "Вопрос 1".into(), + description: "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2".into(), + answer: "42".into(), + batch_info: BatchInfo { + description: "Тестовый".into(), + date: "00-000-2000".into(), + ..Default::default() + }, + ..Default::default() + } + } + + #[test] + fn test_question_ser() { + assert_yaml_snapshot!(sample_question(), @r#" + --- + id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + batch_info: + description: Тестовый + date: 00-000-2000 + + "#); + } + #[test] + fn test_question_de() { + let question_from_json: Result = serde_json::from_value(json!({ + "id": "Вопрос 1", + "description": "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2", + "answer": "42", + "batch_info": { + "description": "Тестовый", + "date": "00-000-2000" + } + })); + assert!(question_from_json.is_ok()); + + assert_yaml_snapshot!(question_from_json.unwrap(), @r#" + --- + id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + batch_info: + description: Тестовый + date: 00-000-2000 + + "#); } } diff --git a/lib/src/source.rs b/lib/src/source.rs index 7cdae96..c194206 100644 --- a/lib/src/source.rs +++ b/lib/src/source.rs @@ -1,11 +1,10 @@ use serde_derive::{Deserialize, Serialize}; -use std::io::{Read, Seek}; -use zip::ZipArchive; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct SourceQuestion { - #[serde(default)] + #[serde(default, skip_serializing_if = "u32_is_zero")] pub num: u32, + #[serde(default)] pub id: String, #[serde(alias = "Вопрос")] @@ -13,194 +12,568 @@ pub struct SourceQuestion { #[serde(alias = "Ответ")] pub answer: String, - #[serde(alias = "Автор")] - #[serde(default)] + #[serde(alias = "Автор", default, skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(alias = "Комментарий")] - #[serde(default)] + #[serde( + default, + alias = "Комментарий", + skip_serializing_if = "String::is_empty" + )] pub comment: String, - #[serde(alias = "Комментарии")] - #[serde(alias = "Инфо")] - #[serde(default)] + #[serde( + default, + alias = "Комментарии", + alias = "Инфо", + skip_serializing_if = "String::is_empty" + )] pub comment1: String, - #[serde(alias = "Тур")] - #[serde(default)] + #[serde(default, alias = "Тур", skip_serializing_if = "String::is_empty")] pub tour: String, - #[serde(alias = "Ссылка")] - #[serde(alias = "URL")] - #[serde(default)] + #[serde( + default, + alias = "Ссылка", + alias = "URL", + skip_serializing_if = "String::is_empty" + )] pub url: String, - #[serde(alias = "Дата")] - #[serde(default)] + #[serde(default, alias = "Дата", skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(alias = "Обработан")] - #[serde(default)] + #[serde(default, alias = "Обработан", skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(alias = "Редактор")] - #[serde(default)] + #[serde(default, alias = "Редактор", skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(alias = "Копирайт")] - #[serde(default)] + #[serde(default, alias = "Копирайт", skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(alias = "Тема")] - #[serde(default)] + #[serde(default, alias = "Тема", skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(alias = "Вид")] - #[serde(alias = "Тип")] - #[serde(default)] + #[serde( + default, + alias = "Вид", + alias = "Тип", + skip_serializing_if = "String::is_empty" + )] pub kind: String, - #[serde(alias = "Источник")] - #[serde(default)] + #[serde(default, alias = "Источник", skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(alias = "Рейтинг")] - #[serde(default)] + #[serde(default, alias = "Рейтинг", skip_serializing_if = "String::is_empty")] pub rating: String, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct SourceQuestionsBatch { - #[serde(default)] + #[serde(default, skip_serializing_if = "String::is_empty")] pub filename: String, - #[serde(alias = "Пакет")] - #[serde(alias = "Чемпионат")] + #[serde(alias = "Пакет", alias = "Чемпионат")] pub description: String, - #[serde(alias = "Автор")] - #[serde(default)] + #[serde(default, alias = "Автор", skip_serializing_if = "String::is_empty")] pub author: String, - #[serde(alias = "Комментарий")] - #[serde(alias = "Комментарии")] - #[serde(alias = "Инфо")] - #[serde(default)] + #[serde( + default, + alias = "Комментарий", + alias = "Комментарии", + alias = "Инфо", + skip_serializing_if = "String::is_empty" + )] pub comment: String, - #[serde(alias = "Ссылка")] - #[serde(alias = "URL")] - #[serde(default)] + #[serde( + default, + alias = "Ссылка", + alias = "URL", + skip_serializing_if = "String::is_empty" + )] pub url: String, - #[serde(alias = "Дата")] - #[serde(default)] + #[serde(default, alias = "Дата", skip_serializing_if = "String::is_empty")] pub date: String, - #[serde(alias = "Обработан")] - #[serde(default)] + #[serde(default, alias = "Обработан", skip_serializing_if = "String::is_empty")] pub processed_by: String, - #[serde(alias = "Редактор")] - #[serde(default)] + #[serde(default, alias = "Редактор", skip_serializing_if = "String::is_empty")] pub redacted_by: String, - #[serde(alias = "Копирайт")] - #[serde(default)] + #[serde(default, alias = "Копирайт", skip_serializing_if = "String::is_empty")] pub copyright: String, - #[serde(alias = "Тема")] - #[serde(default)] + #[serde(default, alias = "Тема", skip_serializing_if = "String::is_empty")] pub theme: String, - #[serde(alias = "Вид")] - #[serde(alias = "Тип")] - #[serde(default)] + #[serde( + default, + alias = "Вид", + alias = "Тип", + skip_serializing_if = "String::is_empty" + )] pub kind: String, - #[serde(alias = "Источник")] - #[serde(default)] + #[serde(default, alias = "Источник", skip_serializing_if = "String::is_empty")] pub source: String, - #[serde(alias = "Рейтинг")] - #[serde(default)] + #[serde(default, alias = "Рейтинг", skip_serializing_if = "String::is_empty")] pub rating: String, #[serde(alias = "Вопросы")] pub questions: Vec, } -pub struct SourceQuestionsZipReader -where - R: Read + Seek, -{ - zipfile: ZipArchive, - index: Option, +fn u32_is_zero(num: &u32) -> bool { + *num == 0 } -impl SourceQuestionsZipReader -where - R: Read + Seek, -{ - fn new(zipfile: ZipArchive) -> Self { - SourceQuestionsZipReader { - zipfile, - index: None, - } - } -} +#[cfg(any(feature = "convert", feature = "source"))] +pub mod reader_sync { + use std::io::{Read, Seek}; + use zip::ZipArchive; -impl Iterator for SourceQuestionsZipReader -where - R: Read + Seek, -{ - type Item = (String, Result); + use super::SourceQuestionsBatch; - fn next(&mut self) -> Option { - if self.index.is_none() && !self.zipfile.is_empty() { - self.index = Some(0); - } - - match self.index { - Some(i) if i < self.zipfile.len() => { - self.index = Some(i + 1); - - self.nth(i) - } - _ => None, - } - } - - fn nth(&mut self, n: usize) -> Option { - if self.zipfile.len() <= n { - return None; - } - self.index = Some(n + 1); - - let file = self.zipfile.by_index(n).unwrap(); - let name = file.mangled_name(); - let name_str = name.to_str().unwrap(); - - let data: Result = serde_json::from_reader(file); - - Some((String::from(name_str), data)) - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.zipfile.len(); - let index = self.index.unwrap_or(0); - let rem = if len > index + 1 { - len - (index + 1) - } else { - 0 - }; - (rem, Some(rem)) - } - - fn count(self) -> usize + pub struct SourceQuestionsZipReader where - Self: Sized, + R: Read + Seek, { - self.zipfile.len() + zipfile: ZipArchive, + index: Option, + } + + impl SourceQuestionsZipReader + where + R: Read + Seek, + { + fn new(zipfile: ZipArchive) -> Self { + SourceQuestionsZipReader { + zipfile, + index: None, + } + } + } + + impl Iterator for SourceQuestionsZipReader + where + R: Read + Seek, + { + type Item = (String, Result); + + fn next(&mut self) -> Option { + if self.index.is_none() && !self.zipfile.is_empty() { + self.index = Some(0); + } + + match self.index { + Some(i) if i < self.zipfile.len() => { + self.index = Some(i + 1); + + self.nth(i) + } + _ => None, + } + } + + fn nth(&mut self, n: usize) -> Option { + if self.zipfile.len() <= n { + return None; + } + self.index = Some(n + 1); + + let file = self.zipfile.by_index(n).unwrap(); + let name = file.mangled_name(); + let name_str = name.to_str().unwrap(); + + let data: Result = serde_json::from_reader(file); + + Some((String::from(name_str), data)) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.zipfile.len(); + let index = self.index.unwrap_or(0); + let rem = if len > index + 1 { + len - (index + 1) + } else { + 0 + }; + (rem, Some(rem)) + } + + fn count(self) -> usize + where + Self: Sized, + { + self.zipfile.len() + } + } + + impl ExactSizeIterator for SourceQuestionsZipReader + where + R: Read + Seek, + { + fn len(&self) -> usize { + self.zipfile.len() + } + } + + pub trait ReadSourceQuestionsBatches + where + R: Read + Seek, + { + fn source_questions(self) -> SourceQuestionsZipReader; + } + + impl ReadSourceQuestionsBatches for ZipArchive + where + R: Read + Seek, + { + fn source_questions(self) -> SourceQuestionsZipReader { + SourceQuestionsZipReader::new(self) + } + } + #[cfg(test)] + mod test { + use super::super::test::sample_batch; + use super::*; + use std::fs; + use std::{io::Write, iter, path::Path}; + use tempfile::tempdir; + + fn write_sample_zip

(path: P) + where + P: AsRef, + { + let batch = sample_batch(); + let z_file = fs::File::create(path).expect("crerate zip file"); + let mut zip_file = zip::ZipWriter::new(z_file); + let options = + zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Zstd); + zip_file + .start_file("test.json", options) + .expect("zip start file"); + let data = serde_json::to_vec(&batch).unwrap(); + let amount = zip_file.write(data.as_slice()).expect("write entry"); + assert_eq!(amount, data.len()); + zip_file.finish().expect("finish zip file"); + } + + #[test] + fn test_source_questions_get() { + let expected_batch = sample_batch(); + let dir = tempdir().expect("tempdir"); + + // write sample + let tmpfile_zip = dir.path().join("test.zip"); + write_sample_zip(&tmpfile_zip); + + let z_file = fs::File::open(tmpfile_zip).expect("open zip file"); + let zip_file = zip::ZipArchive::new(z_file).expect("open zip file reader"); + + let mut source = zip_file.source_questions(); + assert_eq!(source.len(), 1); + + let actual = source.next().expect("get batch"); + assert_eq!(actual.0, "test.json"); + assert_eq!(actual.1.expect("parse batch"), expected_batch); + } + + #[test] + fn test_source_questions_iter() { + let expected_batch = sample_batch(); + let dir = tempdir().expect("tempdir"); + + // write sample + let tmpfile_zip = dir.path().join("test.zip"); + write_sample_zip(&tmpfile_zip); + + let z_file = fs::File::open(tmpfile_zip).expect("open zip file"); + let zip_file = zip::ZipArchive::new(z_file).expect("open zip file reader"); + + let source = zip_file.source_questions(); + assert_eq!(source.len(), 1); + + let expected_iter = iter::once((String::from("test.json"), Ok(expected_batch))); + + assert!(source + .map(|x| (x.0, x.1.map_err(|e| e.to_string()))) + .eq(expected_iter)); + } } } -impl ExactSizeIterator for SourceQuestionsZipReader -where - R: Read + Seek, -{ - fn len(&self) -> usize { - self.zipfile.len() +#[cfg(any(feature = "convert", feature = "source"))] +pub use reader_sync::{ReadSourceQuestionsBatches, SourceQuestionsZipReader}; + +#[cfg(any(feature = "convert_async", feature = "source_async"))] +pub mod reader_async { + use async_stream::stream; + use async_zip::tokio::read::seek::ZipFileReader; + use futures_core::stream::Stream; + use futures_util::AsyncReadExt; + + use tokio::io::{AsyncRead, AsyncSeek}; + + use super::SourceQuestionsBatch; + + pub struct SourceQuestionsZipReaderAsync + where + R: AsyncRead + AsyncSeek + Unpin, + { + zipfile: ZipFileReader, + index: Option, + } + + impl SourceQuestionsZipReaderAsync + where + R: AsyncRead + AsyncSeek + Unpin, + { + fn new(zipfile: ZipFileReader) -> Self { + SourceQuestionsZipReaderAsync { + zipfile, + index: None, + } + } + + pub fn len(&self) -> usize { + self.zipfile.file().entries().len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub async fn get( + &mut self, + index: usize, + ) -> Result<(String, Result), String> + where + R: AsyncRead + AsyncSeek + Unpin, + { + let len = self.len(); + if index >= len { + return Err(format!("get index={index}, when len={len}")); + } + + let reader = self.zipfile.reader_with_entry(index).await; + if let Err(error) = reader { + return Err(format!("reader_with_entry: {error:?}")); + } + let mut reader = reader.unwrap(); + + let filename = reader.entry().filename().clone().into_string().unwrap(); + let mut data: Vec = Vec::new(); + let readed = reader.read_to_end(&mut data).await; + if let Err(error) = readed { + return Err(format!("read_to_end: {error:?}")); + } + let parsed: Result = serde_json::from_slice(&data); + Ok((filename, parsed)) + } + pub async fn get_next( + &mut self, + ) -> Option), String>> + where + R: AsyncRead + AsyncSeek + Unpin, + { + if self.index.is_none() && !self.is_empty() { + self.index = Some(0); + } + + if self.index.unwrap() >= self.len() { + return None; + } + + let item = self.get(self.index.unwrap()).await; + self.index = Some(self.index.unwrap() + 1); + + Some(item) + } + pub fn stream( + &mut self, + ) -> impl Stream)> + '_ + { + stream! { + while let Some(Ok(item)) = self.get_next().await { + yield item + } + } + } + } + + pub trait ReadSourceQuestionsBatchesAsync + where + R: AsyncRead + AsyncSeek + Unpin, + { + fn source_questions(self) -> SourceQuestionsZipReaderAsync; + } + + impl ReadSourceQuestionsBatchesAsync for ZipFileReader + where + R: AsyncRead + AsyncSeek + Unpin, + { + fn source_questions(self) -> SourceQuestionsZipReaderAsync { + SourceQuestionsZipReaderAsync::new(self) + } + } + + #[cfg(test)] + mod test { + use crate::source::SourceQuestion; + + use super::super::test::sample_batch; + use super::*; + use async_zip::{base::write::ZipFileWriter, ZipEntryBuilder}; + use core::fmt::Debug; + use futures_util::StreamExt; + use std::path::Path; + use tempfile::tempdir; + use tokio::fs; + + async fn write_sample_zip

(path: P) + where + P: AsRef, + { + let batch = sample_batch(); + let z_file = fs::File::create(path).await.expect("crerate zip file"); + let mut zip_file = ZipFileWriter::with_tokio(z_file); + let entry = + ZipEntryBuilder::new("test.json".into(), async_zip::Compression::Zstd).build(); + zip_file + .write_entry_whole(entry, serde_json::to_vec(&batch).unwrap().as_slice()) + .await + .expect("write entry"); + zip_file.close().await.expect("close zip"); + } + + async fn assert_data_rref_eq((x, y): (T, &T)) + where + T: PartialEq + Debug, + { + assert_eq!(x, *y); + } + + #[tokio::test] + async fn test_source_questions_stream() { + let expected_batch = sample_batch(); + let dir = tempdir().expect("tempdir"); + + // write sample + let tmpfile_zip = dir.path().join("test.zip"); + write_sample_zip(&tmpfile_zip).await; + + let mut z_file = fs::File::open(tmpfile_zip).await.expect("open zip file"); + let zip_file = ZipFileReader::with_tokio(&mut z_file) + .await + .expect("open zip file reader"); + + let expected_count = expected_batch.questions.len(); + let expected_stream = futures::stream::iter(expected_batch.questions.iter()); + let mut actual_source = zip_file.source_questions(); + let actual_stream = actual_source.stream(); + let mut actual_count: usize = 0; + + actual_stream + .flat_map(|x| futures::stream::iter(x.1.expect("parse batch").questions)) + .zip(expected_stream) + .map(|x| { + actual_count += 1; + x + }) + .for_each(assert_data_rref_eq::) + .await; + assert_eq!(actual_count, expected_count); + } + + #[tokio::test] + async fn test_source_questions_get() { + let expected_batch = sample_batch(); + let dir = tempdir().expect("tempdir"); + + // write sample + let tmpfile_zip = dir.path().join("test.zip"); + write_sample_zip(&tmpfile_zip).await; + + let mut z_file = fs::File::open(tmpfile_zip).await.expect("open zip file"); + let zip_file = ZipFileReader::with_tokio(&mut z_file) + .await + .expect("open zip file reader"); + + let mut source = zip_file.source_questions(); + assert_eq!(source.len(), 1); + + let actual = source.get(0).await.expect("get batch"); + assert_eq!(actual.0, "test.json"); + assert_eq!(actual.1.expect("parse batch"), expected_batch); + } } } +#[cfg(any(feature = "convert_async", feature = "source_async"))] +pub use reader_async::{ReadSourceQuestionsBatchesAsync, SourceQuestionsZipReaderAsync}; -pub trait ReadSourceQuestionsBatches -where - R: Read + Seek, -{ - fn source_questions(self) -> SourceQuestionsZipReader; -} +#[cfg(test)] +mod test { + use super::*; + use insta::assert_yaml_snapshot; + use serde_json::json; -impl ReadSourceQuestionsBatches for ZipArchive -where - R: Read + Seek, -{ - fn source_questions(self) -> SourceQuestionsZipReader { - SourceQuestionsZipReader::new(self) + pub fn sample_batch() -> SourceQuestionsBatch { + SourceQuestionsBatch { + description: "Тестовый".into(), + date: "00-000-2000".into(), + questions: vec![ + SourceQuestion { + id: "Вопрос 1".into(), + description: "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2".into(), + answer: "42".into(), + ..Default::default() + }, + SourceQuestion { + id: "Вопрос 2".into(), + description: "Зимой и летом одним цветом".into(), + answer: "ёлка".into(), + ..Default::default() + }, + ], + ..Default::default() + } + } + + #[test] + fn test_batch_ser() { + let batch = sample_batch(); + + assert_yaml_snapshot!(batch, @r#" + --- + description: Тестовый + date: 00-000-2000 + questions: + - id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + - id: Вопрос 2 + description: Зимой и летом одним цветом + answer: ёлка + + "#); + } + #[test] + fn test_batch_de() { + let batch_from_json: Result = serde_json::from_value(json!({ + "Чемпионат": "Тестовый", + "Дата": "00-000-2000", + "Вопросы": [ + { + "id": "Вопрос 1", + "Вопрос": "Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2", + "Ответ": "42", + }, + { + "id": "Вопрос 2", + "Вопрос": "Зимой и летом одним цветом", + "Ответ": "ёлка", + }, + ] + })); + assert!(batch_from_json.is_ok()); + + assert_yaml_snapshot!(batch_from_json.unwrap(), @r#" + --- + description: Тестовый + date: 00-000-2000 + questions: + - id: Вопрос 1 + description: Сколько будет (2 * 2 * 2 + 2) * 2 * 2 + 2 + answer: "42" + - id: Вопрос 2 + description: Зимой и летом одним цветом + answer: ёлка + + "#); } } diff --git a/lib/src/util.rs b/lib/src/util.rs new file mode 100644 index 0000000..e7c431a --- /dev/null +++ b/lib/src/util.rs @@ -0,0 +1,57 @@ +pub trait ErrorToString { + type Output; + fn str_err(self) -> std::result::Result; +} + +impl ErrorToString for std::result::Result +where + E: std::error::Error, +{ + type Output = T; + fn str_err(self) -> std::result::Result { + self.map_err(|e| e.to_string()) + } +} + +#[cfg(any(feature = "sync", feature = "async"))] +mod bincode_utils { + use std::ops::{Deref, DerefMut}; + + use bincode::enc::write::Writer; + use bincode::error::EncodeError; + + /// struct that allows [`Vec`] to implement [bincode::enc::write::Writer] trait + pub struct BincodeVecWriter { + vec: Vec, + } + + impl BincodeVecWriter { + pub fn new(vec: Vec) -> BincodeVecWriter { + BincodeVecWriter { vec } + } + } + + impl Deref for BincodeVecWriter { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.vec + } + } + + impl DerefMut for BincodeVecWriter { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.vec + } + } + + impl Writer for BincodeVecWriter { + fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { + self.vec.extend_from_slice(bytes); + Ok(()) + } + } +} + +#[cfg(any(feature = "sync", feature = "async"))] +pub use bincode_utils::BincodeVecWriter;