extern crate serde; extern crate serde_derive; #[macro_use] extern crate serde_json; extern crate ledb; extern crate ledb_types; extern crate zip; use clap::{Parser, Subcommand}; use rand::seq::IteratorRandom; use std::io::{self, Cursor, Read, Seek, Write}; use std::path::PathBuf; use std::time::Instant; use std::{fs, sync::mpsc, thread}; use zstd::stream::raw::Operation; use ledb::{Options, Storage}; mod questions; mod source; use crate::questions::{Question, QuestionsConverter}; use crate::source::ReadSourceQuestionsBatches; const ZIP_FILENAME: &str = "json.zip"; const DB_DIR: &str = "db"; #[derive(Subcommand, Debug)] enum Command { Write, Compact, Print { #[clap(value_parser, default_value = "0")] id: u32, }, ZipPrint { #[clap(value_parser, default_value = "0")] file_num: usize, #[clap(value_parser, default_value = "0")] num: usize, }, Write2, Print2 { #[clap(value_parser, default_value = "0")] id: u32, }, } #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] struct Cli { #[clap(subcommand)] command: Command, #[clap(short, long, action)] measure: bool, } fn reader_task(tx: mpsc::Sender) { let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); let zip_reader = io::BufReader::new(zip_file); let archive = zip::ZipArchive::new(zip_reader).unwrap(); let mut source_questions = archive.source_questions(); for question in source_questions.convert() { let res = tx.send(question); if res.is_err() { break; } } println!("read done"); } fn db_writer_task(rx: mpsc::Receiver) { let out_file: PathBuf = [DB_DIR, "data.mdb"].into_iter().collect(); match fs::metadata(&out_file) { Ok(x) if x.is_file() => { fs::remove_file(&out_file).unwrap(); println!(r#""{}" removed"#, out_file.to_str().unwrap()); } _ => {} }; let options: Options = serde_json::from_value(json!({ "map_size": 900 * 1024 * 1024, // 900mb "write_map": true, "map_async": true, "no_lock": true, "no_meta_sync": true, "no_sync": true, })) .unwrap(); let storage = Storage::new(DB_DIR, options).unwrap(); let collection = storage.collection("questions").unwrap(); let mut count: usize = 0; let count = &mut count; rx.into_iter().for_each(|question| { let result = collection.insert(&question); if result.is_err() { println!("-- {:#?}", question); panic!("{:#?}", result); } else { *count += 1; } }); println!("inserted {}\nwriting...", count); storage.sync(true).unwrap(); print!("stats: "); let stats = storage.stat().unwrap(); println!("{:?}", stats); drop(storage); println!("write done"); } fn write_db() { let (tx, rx) = mpsc::channel::(); [ thread::spawn(move || reader_task(tx)), thread::spawn(move || db_writer_task(rx)), ] .into_iter() .for_each(|handle| handle.join().expect("thread panic")); println!("all done"); } fn print_question_from(get_q: F) where F: FnOnce() -> Option, { let q = get_q().unwrap(); println!("{:#?}", q) } fn read_from_zip(file_num: usize, mut num: usize) -> Option { let mut rng = rand::thread_rng(); let zip_file = fs::File::open(ZIP_FILENAME).unwrap(); let zip_reader = io::BufReader::new(zip_file); let archive = zip::ZipArchive::new(zip_reader).unwrap(); let mut source_questions = archive.source_questions(); let (filename, batch) = if file_num == 0 { source_questions.choose(&mut rng).unwrap() } else { source_questions.nth(file_num - 1).unwrap() }; let mut batch = batch.unwrap(); batch.filename = filename; let questions: Vec = batch.into(); if num == 0 { num = (1..=questions.len()).choose(&mut rng).unwrap(); } Some(questions[num - 1].clone()) } fn compact_db() { let options: Options = serde_json::from_value(json!({ "write_map": true, "map_async": true, "no_lock": true, "no_meta_sync": true, "no_sync": true, "compact": true, })) .unwrap(); let storage = Storage::new(DB_DIR, options).unwrap(); storage.sync(true).unwrap(); let stats = storage.stat().unwrap(); println!("{:?}", stats); drop(storage); } fn read_from_db(mut id: u32) -> Option { let options: Options = serde_json::from_value(json!({ "read_only": true, "map_async": true, "no_lock": true, })) .unwrap(); let storage = Storage::new(DB_DIR, options).unwrap(); let collection = storage.collection("questions").unwrap(); let mut rng = rand::thread_rng(); if id == 0 { let last_id = collection.last_id().unwrap(); id = (1..=last_id).choose(&mut rng).unwrap(); } collection.get::(id).unwrap() } // measure and return time elapsed in `func` in seconds pub fn measure(func: F) -> f64 { let start = Instant::now(); func(); let elapsed = start.elapsed(); (elapsed.as_secs() as f64) + (elapsed.subsec_nanos() as f64 / 1_000_000_000.0) } pub fn measure_and_print(func: F) { let m = measure(func); eprintln!("{}", m); } fn main() { let args = Cli::parse(); let mut action: Box = match &args.command { Command::Write => Box::new(write_db), Command::Compact => Box::new(compact_db), Command::Print { id } => { let get_question = Box::new(|| read_from_db(*id)); Box::new(|| print_question_from(get_question)) } Command::ZipPrint { file_num, num } => { let get_question = Box::new(|| read_from_zip(*file_num, *num)); Box::new(|| print_question_from(get_question)) } Command::Write2 => Box::new(write_db2), Command::Print2 { id } => { let get_question = Box::new(|| read_from_db2(*id)); Box::new(|| print_question_from(get_question)) } }; if args.measure { action = Box::new(|| measure_and_print(action)); } action(); } type LSize = u32; const LEN_SIZE: usize = std::mem::size_of::(); fn read_from_db2(id: u32) -> Option { const INPUT_FILENAME: &str = "test.bin"; const INPUT_BUF_SIZE: usize = 4 * 1024; let cfg = bincode::config::standard(); let input = fs::File::open(INPUT_FILENAME).expect("open input"); let mut input = io::BufReader::with_capacity(INPUT_BUF_SIZE, input); let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; input.read_exact(&mut first_data).expect("read first"); let first_pos = LSize::from_le_bytes(first_data); let tab_len = (first_pos as usize) / LEN_SIZE; let records_count = tab_len - 1; let index = match id { 0 => { let mut rng = rand::thread_rng(); (1..records_count).into_iter().choose(&mut rng).unwrap() } _ => (id - 1) as usize, }; if index >= records_count { return None; } let data_pos = if 0 == index { first_pos } else { let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; input .seek_relative(((index - 1) * LEN_SIZE).try_into().expect("index to i64")) .expect("seek to tab pos"); input .read_exact(&mut pos_curr_data) .expect("read current pos"); LSize::from_le_bytes(pos_curr_data) }; let mut pos_next_data: [u8; LEN_SIZE] = [0; LEN_SIZE]; input.read_exact(&mut pos_next_data).expect("read next pos"); let data_pos_next = LSize::from_le_bytes(pos_next_data); let data_len = data_pos_next - data_pos; let tab_pos = input.stream_position().expect("stream pos") as u32; let advance_len = data_pos - tab_pos as LSize; input.seek_relative(advance_len as i64).expect("q seek"); let reader = input.take(data_len as u64); let data = zstd::decode_all(reader).expect("zstd decode data"); let question: (Question, usize) = bincode::decode_from_slice(&data, cfg).expect("bincode decode q"); let question = question.0; Some(question) } fn write_db2() { let (tx, rx) = mpsc::channel::(); [ thread::spawn(move || reader_task(tx)), thread::spawn(move || db_writer2_task(rx)), ] .into_iter() .for_each(|handle| handle.join().expect("thread panic")); println!("all done"); } fn db_writer2_task(rx: mpsc::Receiver) { const COMP_DATA_LEVEL: i32 = 1; const DATA_BUF_SIZE: usize = 500 * 1024 * 1024; const OUT_BUF_SIZE: usize = 200 * 1024 * 1024; const CURRENT_BUF_SIZE: usize = 100 * 1024; const OUTPUT_FILENAME: &str = "test.bin"; let cfg = bincode::config::standard(); let mut table: Vec = vec![]; let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder"); let data_buf: Vec = Vec::with_capacity(DATA_BUF_SIZE); let cur_buf_raw: Vec = Vec::with_capacity(CURRENT_BUF_SIZE); let mut data_buf = Cursor::new(data_buf); let cur_buf_raw = Cursor::new(cur_buf_raw); let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder); let mut num = 1; let mut pos: LSize = 0; rx.into_iter().for_each(|mut q| { q.num = num; let q_data = bincode::encode_to_vec(q, cfg).expect("bincode q encode"); let zencoder = cur_buf_z.operation_mut(); zencoder.reinit().expect("zstd encoder reinit"); zencoder .set_pledged_src_size(q_data.len() as u64) .expect("zstd op set len"); cur_buf_z.write_all(&q_data).expect("write question"); cur_buf_z.finish().expect("zstd q finish"); cur_buf_z.flush().expect("zbuf flush"); table.push(pos); let cur_buf_raw = cur_buf_z.writer_mut(); cur_buf_raw.set_position(0); io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed"); pos = data_buf.stream_position().expect("data buf pos") as LSize; num += 1; }); table.push(pos); data_buf.set_position(0); drop(cur_buf_z); let tab_data = vec![0u8; table.len() * LEN_SIZE]; let mut tab_cursor = Cursor::new(tab_data); let tab_size = (table.len() * LEN_SIZE) as LSize; for pos in table { let pos_data = (pos + tab_size).to_le_bytes(); tab_cursor.write_all(&pos_data).expect("write pos"); } tab_cursor.set_position(0); let out = fs::File::create(OUTPUT_FILENAME).expect("out create"); let mut out = io::BufWriter::with_capacity(OUT_BUF_SIZE, out); io::copy(&mut tab_cursor, &mut out).expect("write tab"); io::copy(&mut data_buf, &mut out).expect("copy z buf"); drop(data_buf); out.flush().expect("out flush"); println!("write done"); }