separate new db reader+writer from main
This commit is contained in:
		
							
								
								
									
										200
									
								
								src/db.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										200
									
								
								src/db.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,200 @@
 | 
				
			|||||||
 | 
					use std::{
 | 
				
			||||||
 | 
					    fs,
 | 
				
			||||||
 | 
					    io::{self, Cursor, Read, Seek, SeekFrom, Write},
 | 
				
			||||||
 | 
					    marker::PhantomData,
 | 
				
			||||||
 | 
					    path::Path,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					use zstd::stream::{raw::Operation, zio};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type LSize = u32;
 | 
				
			||||||
 | 
					const LEN_SIZE: usize = std::mem::size_of::<LSize>();
 | 
				
			||||||
 | 
					const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					trait ErrorToString {
 | 
				
			||||||
 | 
					    type Output;
 | 
				
			||||||
 | 
					    fn str_err(self) -> std::result::Result<Self::Output, String>;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<T, E> ErrorToString for std::result::Result<T, E>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    E: std::error::Error,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    type Output = T;
 | 
				
			||||||
 | 
					    fn str_err(self) -> std::result::Result<Self::Output, String> {
 | 
				
			||||||
 | 
					        self.map_err(|e| e.to_string())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct WriterOpts {
 | 
				
			||||||
 | 
					    pub compress_lvl: i32,
 | 
				
			||||||
 | 
					    pub data_buf_size: usize,
 | 
				
			||||||
 | 
					    pub out_buf_size: usize,
 | 
				
			||||||
 | 
					    pub current_buf_size: usize,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Default for WriterOpts {
 | 
				
			||||||
 | 
					    fn default() -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            compress_lvl: 1,
 | 
				
			||||||
 | 
					            data_buf_size: 500 * 1024 * 1024,
 | 
				
			||||||
 | 
					            out_buf_size: 200 * 1024 * 1024,
 | 
				
			||||||
 | 
					            current_buf_size: 20 * 1024,
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct Writer<'a, T>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    T: bincode::Encode,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    out: io::BufWriter<fs::File>,
 | 
				
			||||||
 | 
					    data_buf: Cursor<Vec<u8>>,
 | 
				
			||||||
 | 
					    cur_buf_z: zio::Writer<Cursor<Vec<u8>>, zstd::stream::raw::Encoder<'a>>,
 | 
				
			||||||
 | 
					    table: Vec<LSize>,
 | 
				
			||||||
 | 
					    _t: PhantomData<*const T>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<T> Writer<'_, T>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    T: bincode::Encode,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
 | 
				
			||||||
 | 
					        let out = fs::File::create(path).str_err()?;
 | 
				
			||||||
 | 
					        let out = io::BufWriter::with_capacity(opts.out_buf_size, out);
 | 
				
			||||||
 | 
					        let data_buf: Vec<u8> = Vec::with_capacity(opts.data_buf_size);
 | 
				
			||||||
 | 
					        let cur_buf_raw: Vec<u8> = Vec::with_capacity(opts.current_buf_size);
 | 
				
			||||||
 | 
					        let data_buf = Cursor::new(data_buf);
 | 
				
			||||||
 | 
					        let cur_buf_raw = Cursor::new(cur_buf_raw);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let zencoder = zstd::stream::raw::Encoder::new(opts.compress_lvl).str_err()?;
 | 
				
			||||||
 | 
					        let cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let table: Vec<LSize> = vec![];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Ok(Self {
 | 
				
			||||||
 | 
					            out,
 | 
				
			||||||
 | 
					            data_buf,
 | 
				
			||||||
 | 
					            cur_buf_z,
 | 
				
			||||||
 | 
					            table,
 | 
				
			||||||
 | 
					            _t: PhantomData,
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    pub fn push(&mut self, item: T) -> Result<(), String> {
 | 
				
			||||||
 | 
					        let pos: LSize = self.data_buf.position() as LSize;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let item_data = bincode::encode_to_vec(item, BINCODE_CFG).str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let zencoder = self.cur_buf_z.operation_mut();
 | 
				
			||||||
 | 
					        zencoder.reinit().str_err()?;
 | 
				
			||||||
 | 
					        zencoder
 | 
				
			||||||
 | 
					            .set_pledged_src_size(item_data.len() as u64)
 | 
				
			||||||
 | 
					            .str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.cur_buf_z.write_all(&item_data).str_err()?;
 | 
				
			||||||
 | 
					        self.cur_buf_z.finish().str_err()?;
 | 
				
			||||||
 | 
					        self.cur_buf_z.flush().str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.table.push(pos);
 | 
				
			||||||
 | 
					        let cur_buf_raw = self.cur_buf_z.writer_mut();
 | 
				
			||||||
 | 
					        let size = cur_buf_raw.position();
 | 
				
			||||||
 | 
					        cur_buf_raw.set_position(0);
 | 
				
			||||||
 | 
					        io::copy(&mut cur_buf_raw.take(size), &mut self.data_buf).str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    pub fn finish(mut self) -> Result<(), String> {
 | 
				
			||||||
 | 
					        // finish tab
 | 
				
			||||||
 | 
					        let pos: LSize = self.data_buf.position() as LSize;
 | 
				
			||||||
 | 
					        self.table.push(pos);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // write tab
 | 
				
			||||||
 | 
					        let tab_size = (self.table.len() * LEN_SIZE) as LSize;
 | 
				
			||||||
 | 
					        for pos in self.table {
 | 
				
			||||||
 | 
					            let pos_data = (pos + tab_size).to_le_bytes();
 | 
				
			||||||
 | 
					            self.out.write_all(&pos_data).str_err()?;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // copy data
 | 
				
			||||||
 | 
					        let data_size = self.data_buf.position();
 | 
				
			||||||
 | 
					        self.data_buf.set_position(0);
 | 
				
			||||||
 | 
					        let mut data = self.data_buf.take(data_size);
 | 
				
			||||||
 | 
					        io::copy(&mut data, &mut self.out).str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.out.flush().str_err()?;
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub struct Reader<T>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    T: bincode::Decode,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    input: io::BufReader<fs::File>,
 | 
				
			||||||
 | 
					    count: usize,
 | 
				
			||||||
 | 
					    first_pos: LSize,
 | 
				
			||||||
 | 
					    _t: PhantomData<*const T>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl<T> Reader<T>
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    T: bincode::Decode,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    pub fn new<P: AsRef<Path>>(path: P, buf_size: usize) -> Result<Self, String> {
 | 
				
			||||||
 | 
					        let input = fs::File::open(path).str_err()?;
 | 
				
			||||||
 | 
					        let mut input = io::BufReader::with_capacity(buf_size, input);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // read first pos and records count
 | 
				
			||||||
 | 
					        let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
				
			||||||
 | 
					        input.read_exact(&mut first_data).str_err()?;
 | 
				
			||||||
 | 
					        let first_pos = LSize::from_le_bytes(first_data);
 | 
				
			||||||
 | 
					        let tab_len = (first_pos as usize) / LEN_SIZE;
 | 
				
			||||||
 | 
					        let count = tab_len - 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Ok(Self {
 | 
				
			||||||
 | 
					            input,
 | 
				
			||||||
 | 
					            count,
 | 
				
			||||||
 | 
					            first_pos,
 | 
				
			||||||
 | 
					            _t: PhantomData,
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn len(&self) -> usize {
 | 
				
			||||||
 | 
					        self.count
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub fn get(&mut self, index: usize) -> Result<T, String> {
 | 
				
			||||||
 | 
					        if index >= self.len() {
 | 
				
			||||||
 | 
					            return Err("index out of range".into());
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // read item data pos
 | 
				
			||||||
 | 
					        let data_pos = if 0 == index {
 | 
				
			||||||
 | 
					            self.first_pos
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            let tab_pos: u64 = (index * LEN_SIZE).try_into().str_err()?;
 | 
				
			||||||
 | 
					            let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
				
			||||||
 | 
					            self.input.seek(SeekFrom::Start(tab_pos)).str_err()?;
 | 
				
			||||||
 | 
					            self.input.read_exact(&mut pos_curr_data).str_err()?;
 | 
				
			||||||
 | 
					            LSize::from_le_bytes(pos_curr_data)
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // read next item pos
 | 
				
			||||||
 | 
					        let mut pos_next_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
				
			||||||
 | 
					        self.input.read_exact(&mut pos_next_data).str_err()?;
 | 
				
			||||||
 | 
					        let data_pos_next = LSize::from_le_bytes(pos_next_data);
 | 
				
			||||||
 | 
					        // calc item data length
 | 
				
			||||||
 | 
					        let data_len = data_pos_next - data_pos;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // read & unpack item data
 | 
				
			||||||
 | 
					        self.input
 | 
				
			||||||
 | 
					            .seek(SeekFrom::Start(data_pos as u64))
 | 
				
			||||||
 | 
					            .str_err()?;
 | 
				
			||||||
 | 
					        let reader = self.input.by_ref().take(data_len as u64);
 | 
				
			||||||
 | 
					        let data = zstd::decode_all(reader).str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // decode item
 | 
				
			||||||
 | 
					        let item: (T, usize) = bincode::decode_from_slice(&data, BINCODE_CFG).str_err()?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Ok(item.0)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										120
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										120
									
								
								src/main.rs
									
									
									
									
									
								
							@@ -9,15 +9,14 @@ extern crate zip;
 | 
				
			|||||||
use clap::{Parser, Subcommand};
 | 
					use clap::{Parser, Subcommand};
 | 
				
			||||||
use rand::seq::IteratorRandom;
 | 
					use rand::seq::IteratorRandom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use std::io::{self, Cursor, Read, Seek, Write};
 | 
					use std::io;
 | 
				
			||||||
use std::path::PathBuf;
 | 
					use std::path::PathBuf;
 | 
				
			||||||
use std::time::Instant;
 | 
					use std::time::Instant;
 | 
				
			||||||
use std::{fs, sync::mpsc, thread};
 | 
					use std::{fs, sync::mpsc, thread};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use zstd::stream::raw::Operation;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use ledb::{Options, Storage};
 | 
					use ledb::{Options, Storage};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					mod db;
 | 
				
			||||||
mod questions;
 | 
					mod questions;
 | 
				
			||||||
mod source;
 | 
					mod source;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -25,6 +24,7 @@ use crate::questions::{Question, QuestionsConverter};
 | 
				
			|||||||
use crate::source::ReadSourceQuestionsBatches;
 | 
					use crate::source::ReadSourceQuestionsBatches;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const ZIP_FILENAME: &str = "json.zip";
 | 
					const ZIP_FILENAME: &str = "json.zip";
 | 
				
			||||||
 | 
					const NEW_DB_FILENAME: &str = "test.bin";
 | 
				
			||||||
const DB_DIR: &str = "db";
 | 
					const DB_DIR: &str = "db";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Subcommand, Debug)]
 | 
					#[derive(Subcommand, Debug)]
 | 
				
			||||||
@@ -235,67 +235,23 @@ fn main() {
 | 
				
			|||||||
    action();
 | 
					    action();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type LSize = u32;
 | 
					 | 
				
			||||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
fn read_from_db2(id: u32) -> Option<Question> {
 | 
					fn read_from_db2(id: u32) -> Option<Question> {
 | 
				
			||||||
    const INPUT_FILENAME: &str = "test.bin";
 | 
					    let mut reader: db::Reader<Question> =
 | 
				
			||||||
    const INPUT_BUF_SIZE: usize = 4 * 1024;
 | 
					        db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader");
 | 
				
			||||||
 | 
					 | 
				
			||||||
    let cfg = bincode::config::standard();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let input = fs::File::open(INPUT_FILENAME).expect("open input");
 | 
					 | 
				
			||||||
    let mut input = io::BufReader::with_capacity(INPUT_BUF_SIZE, input);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let mut first_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
					 | 
				
			||||||
    input.read_exact(&mut first_data).expect("read first");
 | 
					 | 
				
			||||||
    let first_pos = LSize::from_le_bytes(first_data);
 | 
					 | 
				
			||||||
    let tab_len = (first_pos as usize) / LEN_SIZE;
 | 
					 | 
				
			||||||
    let records_count = tab_len - 1;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let index = match id {
 | 
					    let index = match id {
 | 
				
			||||||
        0 => {
 | 
					        0 => {
 | 
				
			||||||
            let mut rng = rand::thread_rng();
 | 
					            let mut rng = rand::thread_rng();
 | 
				
			||||||
            (1..records_count).into_iter().choose(&mut rng).unwrap()
 | 
					            (1..reader.len()).into_iter().choose(&mut rng).unwrap()
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        _ => (id - 1) as usize,
 | 
					        _ => (id - 1) as usize,
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if index >= records_count {
 | 
					    if index >= reader.len() {
 | 
				
			||||||
        return None;
 | 
					        return None;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let data_pos = if 0 == index {
 | 
					    Some(reader.get(index).expect("get question"))
 | 
				
			||||||
        first_pos
 | 
					 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
        let mut pos_curr_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
					 | 
				
			||||||
        input
 | 
					 | 
				
			||||||
            .seek_relative(((index - 1) * LEN_SIZE).try_into().expect("index to i64"))
 | 
					 | 
				
			||||||
            .expect("seek to tab pos");
 | 
					 | 
				
			||||||
        input
 | 
					 | 
				
			||||||
            .read_exact(&mut pos_curr_data)
 | 
					 | 
				
			||||||
            .expect("read current pos");
 | 
					 | 
				
			||||||
        LSize::from_le_bytes(pos_curr_data)
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let mut pos_next_data: [u8; LEN_SIZE] = [0; LEN_SIZE];
 | 
					 | 
				
			||||||
    input.read_exact(&mut pos_next_data).expect("read next pos");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let data_pos_next = LSize::from_le_bytes(pos_next_data);
 | 
					 | 
				
			||||||
    let data_len = data_pos_next - data_pos;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let tab_pos = input.stream_position().expect("stream pos") as u32;
 | 
					 | 
				
			||||||
    let advance_len = data_pos - tab_pos as LSize;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    input.seek_relative(advance_len as i64).expect("q seek");
 | 
					 | 
				
			||||||
    let reader = input.take(data_len as u64);
 | 
					 | 
				
			||||||
    let data = zstd::decode_all(reader).expect("zstd decode data");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let question: (Question, usize) =
 | 
					 | 
				
			||||||
        bincode::decode_from_slice(&data, cfg).expect("bincode decode q");
 | 
					 | 
				
			||||||
    let question = question.0;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    Some(question)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
fn write_db2() {
 | 
					fn write_db2() {
 | 
				
			||||||
    let (tx, rx) = mpsc::channel::<Question>();
 | 
					    let (tx, rx) = mpsc::channel::<Question>();
 | 
				
			||||||
@@ -308,68 +264,22 @@ fn write_db2() {
 | 
				
			|||||||
    println!("all done");
 | 
					    println!("all done");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
fn db_writer2_task(rx: mpsc::Receiver<Question>) {
 | 
					fn db_writer2_task(rx: mpsc::Receiver<Question>) {
 | 
				
			||||||
    const COMP_DATA_LEVEL: i32 = 1;
 | 
					    let writer_opts = db::WriterOpts::default();
 | 
				
			||||||
    const DATA_BUF_SIZE: usize = 500 * 1024 * 1024;
 | 
					    let mut writer: db::Writer<Question> =
 | 
				
			||||||
    const OUT_BUF_SIZE: usize = 200 * 1024 * 1024;
 | 
					        db::Writer::new(NEW_DB_FILENAME, writer_opts).expect("new db writer");
 | 
				
			||||||
    const CURRENT_BUF_SIZE: usize = 100 * 1024;
 | 
					 | 
				
			||||||
    const OUTPUT_FILENAME: &str = "test.bin";
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let cfg = bincode::config::standard();
 | 
					 | 
				
			||||||
    let mut table: Vec<LSize> = vec![];
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let zencoder = zstd::stream::raw::Encoder::new(COMP_DATA_LEVEL).expect("new zstd encoder");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let data_buf: Vec<u8> = Vec::with_capacity(DATA_BUF_SIZE);
 | 
					 | 
				
			||||||
    let cur_buf_raw: Vec<u8> = Vec::with_capacity(CURRENT_BUF_SIZE);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let mut data_buf = Cursor::new(data_buf);
 | 
					 | 
				
			||||||
    let cur_buf_raw = Cursor::new(cur_buf_raw);
 | 
					 | 
				
			||||||
    let mut cur_buf_z = zstd::stream::zio::Writer::new(cur_buf_raw, zencoder);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let mut num = 1;
 | 
					    let mut num = 1;
 | 
				
			||||||
    let mut pos: LSize = 0;
 | 
					 | 
				
			||||||
    rx.into_iter().for_each(|mut q| {
 | 
					    rx.into_iter().for_each(|mut q| {
 | 
				
			||||||
        q.num = num;
 | 
					        q.num = num;
 | 
				
			||||||
        let q_data = bincode::encode_to_vec(q, cfg).expect("bincode q encode");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let zencoder = cur_buf_z.operation_mut();
 | 
					        writer
 | 
				
			||||||
        zencoder.reinit().expect("zstd encoder reinit");
 | 
					            .push(q)
 | 
				
			||||||
        zencoder
 | 
					            .unwrap_or_else(|e| panic!("db writer push, num={num}, {e:#?}"));
 | 
				
			||||||
            .set_pledged_src_size(q_data.len() as u64)
 | 
					 | 
				
			||||||
            .expect("zstd op set len");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        cur_buf_z.write_all(&q_data).expect("write question");
 | 
					 | 
				
			||||||
        cur_buf_z.finish().expect("zstd q finish");
 | 
					 | 
				
			||||||
        cur_buf_z.flush().expect("zbuf flush");
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        table.push(pos);
 | 
					 | 
				
			||||||
        let cur_buf_raw = cur_buf_z.writer_mut();
 | 
					 | 
				
			||||||
        cur_buf_raw.set_position(0);
 | 
					 | 
				
			||||||
        io::copy(cur_buf_raw, &mut data_buf).expect("copy current compressed");
 | 
					 | 
				
			||||||
        pos = data_buf.stream_position().expect("data buf pos") as LSize;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        num += 1;
 | 
					        num += 1;
 | 
				
			||||||
    });
 | 
					    });
 | 
				
			||||||
    table.push(pos);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    data_buf.set_position(0);
 | 
					    writer.finish().expect("db writer finish");
 | 
				
			||||||
    drop(cur_buf_z);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let tab_data = vec![0u8; table.len() * LEN_SIZE];
 | 
					 | 
				
			||||||
    let mut tab_cursor = Cursor::new(tab_data);
 | 
					 | 
				
			||||||
    let tab_size = (table.len() * LEN_SIZE) as LSize;
 | 
					 | 
				
			||||||
    for pos in table {
 | 
					 | 
				
			||||||
        let pos_data = (pos + tab_size).to_le_bytes();
 | 
					 | 
				
			||||||
        tab_cursor.write_all(&pos_data).expect("write pos");
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    tab_cursor.set_position(0);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let out = fs::File::create(OUTPUT_FILENAME).expect("out create");
 | 
					 | 
				
			||||||
    let mut out = io::BufWriter::with_capacity(OUT_BUF_SIZE, out);
 | 
					 | 
				
			||||||
    io::copy(&mut tab_cursor, &mut out).expect("write tab");
 | 
					 | 
				
			||||||
    io::copy(&mut data_buf, &mut out).expect("copy z buf");
 | 
					 | 
				
			||||||
    drop(data_buf);
 | 
					 | 
				
			||||||
    out.flush().expect("out flush");
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    println!("write done");
 | 
					    println!("write done");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user