write_db with threads
This commit is contained in:
		
							
								
								
									
										35
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								src/main.rs
									
									
									
									
									
								
							@@ -8,9 +8,10 @@ extern crate zip;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
use clap::{Parser, Subcommand};
 | 
					use clap::{Parser, Subcommand};
 | 
				
			||||||
use rand::seq::IteratorRandom;
 | 
					use rand::seq::IteratorRandom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use std::path::PathBuf;
 | 
					use std::path::PathBuf;
 | 
				
			||||||
use std::time::Instant;
 | 
					use std::time::Instant;
 | 
				
			||||||
use std::{fs, io};
 | 
					use std::{fs, io, sync::mpsc, thread};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use ledb::{Options, Storage};
 | 
					use ledb::{Options, Storage};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -49,7 +50,17 @@ struct Cli {
 | 
				
			|||||||
    measure: bool,
 | 
					    measure: bool,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn write_db() {
 | 
					fn reader_task(tx: mpsc::Sender<Question>) {
 | 
				
			||||||
 | 
					    let zip_file = fs::File::open(ZIP_FILENAME).unwrap();
 | 
				
			||||||
 | 
					    let zip_reader = io::BufReader::new(zip_file);
 | 
				
			||||||
 | 
					    let archive = zip::ZipArchive::new(zip_reader).unwrap();
 | 
				
			||||||
 | 
					    archive
 | 
				
			||||||
 | 
					        .source_questions()
 | 
				
			||||||
 | 
					        .convert()
 | 
				
			||||||
 | 
					        .for_each(|question| tx.send(question).expect("send question"));
 | 
				
			||||||
 | 
					    println!("read done");
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					fn db_writer_task(rx: mpsc::Receiver<Question>) {
 | 
				
			||||||
    let out_file: PathBuf = [DB_DIR, "data.mdb"].into_iter().collect();
 | 
					    let out_file: PathBuf = [DB_DIR, "data.mdb"].into_iter().collect();
 | 
				
			||||||
    match fs::metadata(&out_file) {
 | 
					    match fs::metadata(&out_file) {
 | 
				
			||||||
        Ok(x) if x.is_file() => {
 | 
					        Ok(x) if x.is_file() => {
 | 
				
			||||||
@@ -59,10 +70,6 @@ fn write_db() {
 | 
				
			|||||||
        _ => {}
 | 
					        _ => {}
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let zip_file = fs::File::open(ZIP_FILENAME).unwrap();
 | 
					 | 
				
			||||||
    let zip_reader = io::BufReader::new(zip_file);
 | 
					 | 
				
			||||||
    let archive = zip::ZipArchive::new(zip_reader).unwrap();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let options: Options = serde_json::from_value(json!({
 | 
					    let options: Options = serde_json::from_value(json!({
 | 
				
			||||||
        "map_size": 900 * 1024 * 1024, // 900mb
 | 
					        "map_size": 900 * 1024 * 1024, // 900mb
 | 
				
			||||||
        "write_map": true,
 | 
					        "write_map": true,
 | 
				
			||||||
@@ -76,11 +83,9 @@ fn write_db() {
 | 
				
			|||||||
    let storage = Storage::new(DB_DIR, options).unwrap();
 | 
					    let storage = Storage::new(DB_DIR, options).unwrap();
 | 
				
			||||||
    let collection = storage.collection("questions").unwrap();
 | 
					    let collection = storage.collection("questions").unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    println!("converting...");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let mut count: usize = 0;
 | 
					    let mut count: usize = 0;
 | 
				
			||||||
    let count = &mut count;
 | 
					    let count = &mut count;
 | 
				
			||||||
    archive.source_questions().convert().for_each(|question| {
 | 
					    rx.into_iter().for_each(|question| {
 | 
				
			||||||
        let result = collection.insert(&question);
 | 
					        let result = collection.insert(&question);
 | 
				
			||||||
        if result.is_err() {
 | 
					        if result.is_err() {
 | 
				
			||||||
            println!("-- {:#?}", question);
 | 
					            println!("-- {:#?}", question);
 | 
				
			||||||
@@ -96,6 +101,18 @@ fn write_db() {
 | 
				
			|||||||
    let stats = storage.stat().unwrap();
 | 
					    let stats = storage.stat().unwrap();
 | 
				
			||||||
    println!("{:?}", stats);
 | 
					    println!("{:?}", stats);
 | 
				
			||||||
    drop(storage);
 | 
					    drop(storage);
 | 
				
			||||||
 | 
					    println!("write done");
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn write_db() {
 | 
				
			||||||
 | 
					    let (tx, rx) = mpsc::channel::<Question>();
 | 
				
			||||||
 | 
					    [
 | 
				
			||||||
 | 
					        thread::spawn(move || reader_task(tx)),
 | 
				
			||||||
 | 
					        thread::spawn(move || db_writer_task(rx)),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					    .into_iter()
 | 
				
			||||||
 | 
					    .for_each(|handle| handle.join().expect("thread panic"));
 | 
				
			||||||
 | 
					    println!("all done");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn print_question_from<F>(get_q: F)
 | 
					fn print_question_from<F>(get_q: F)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user