move enumerate to reader_task

This commit is contained in:
Dmitry Belyaev 2022-10-10 15:44:32 +03:00
parent 17446a6318
commit f275069f23
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3

View File

@ -63,7 +63,15 @@ fn reader_task(tx: mpsc::Sender<Question>) {
let zip_reader = io::BufReader::new(zip_file);
let archive = zip::ZipArchive::new(zip_reader).unwrap();
let mut source_questions = archive.source_questions();
for question in source_questions.convert() {
let questions = source_questions
.convert()
.enumerate()
.map(|(num, mut question)| {
question.num = 1 + num as u32;
question
});
for question in questions {
let res = tx.send(question);
if res.is_err() {
break;
@ -95,22 +103,20 @@ fn db_writer_task(rx: mpsc::Receiver<Question>) {
let collection = storage.collection("questions").unwrap();
let mut count: usize = 0;
let count = &mut count;
rx.into_iter().for_each(|question| {
let result = collection.insert(&question);
if result.is_err() {
println!("-- {:#?}", question);
panic!("{:#?}", result);
} else {
*count += 1;
}
collection.insert(question).expect("insert question");
count += 1;
});
println!("inserted {}\nwriting...", count);
println!("inserted {count}");
println!("syncing to disk...");
storage.sync(true).unwrap();
print!("stats: ");
let stats = storage.stat().unwrap();
println!("{:?}", stats);
drop(storage);
println!("write done");
}
@ -238,7 +244,7 @@ fn main() {
fn read_from_db2(id: u32) -> Option<Question> {
let mut reader: db::Reader<Question> =
db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader");
let mut questions = reader.iter();
match id {
@ -264,15 +270,8 @@ fn db_writer2_task(rx: mpsc::Receiver<Question>) {
let mut writer: db::Writer<Question> =
db::Writer::new(NEW_DB_FILENAME, writer_opts).expect("new db writer");
let mut num = 1;
let mut iter = rx.iter().map(|mut q| {
q.num = num;
num += 1;
q
});
writer
.load(&mut iter)
.load(&mut rx.iter())
.unwrap_or_else(|e| panic!("db writer load, {e:#?}"));
writer.finish().expect("db writer finish");