add locks + file alloc in Writer::finish
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -17,6 +17,7 @@ zip="0.6"
|
||||
bincode = "^2.0.0-rc.2"
|
||||
zstd = "^0.10"
|
||||
memmap = "0.7.0"
|
||||
fs4 = { version = "0.6.3", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.3"
|
||||
|
@@ -8,6 +8,8 @@ use std::{
|
||||
|
||||
use memmap::{Mmap, MmapOptions};
|
||||
|
||||
use fs4::FileExt;
|
||||
|
||||
type LSize = u32;
|
||||
const LEN_SIZE: usize = std::mem::size_of::<LSize>();
|
||||
const BINCODE_CFG: bincode::config::Configuration = bincode::config::standard();
|
||||
@@ -63,6 +65,7 @@ where
|
||||
{
|
||||
pub fn new<P: AsRef<Path>>(path: P, opts: WriterOpts) -> Result<Self, String> {
|
||||
let out = fs::File::create(path).str_err()?;
|
||||
out.try_lock_exclusive().str_err()?;
|
||||
let out = io::BufWriter::with_capacity(opts.out_buf_size, out);
|
||||
let data_buf: Vec<u8> = Vec::with_capacity(opts.data_buf_size);
|
||||
let data_buf = Cursor::new(data_buf);
|
||||
@@ -133,6 +136,9 @@ where
|
||||
let pos: LSize = self.data_buf.position() as LSize;
|
||||
self.table.push(pos);
|
||||
|
||||
let output_size: u64 = (self.table.len() * LEN_SIZE) as u64 + self.data_buf.position();
|
||||
self.out.get_ref().allocate(output_size).str_err()?;
|
||||
|
||||
// write tab
|
||||
let tab_size = (self.table.len() * LEN_SIZE) as LSize;
|
||||
for pos in self.table {
|
||||
@@ -147,6 +153,7 @@ where
|
||||
io::copy(&mut data, &mut self.out).str_err()?;
|
||||
|
||||
self.out.flush().str_err()?;
|
||||
self.out.get_ref().unlock().str_err()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -167,6 +174,7 @@ where
|
||||
{
|
||||
pub fn new<P: AsRef<Path>>(path: P, _buf_size: usize) -> Result<Self, String> {
|
||||
let file = fs::File::open(path).str_err()?;
|
||||
file.try_lock_shared().str_err()?;
|
||||
let mmap = unsafe { MmapOptions::new().map(&file).str_err()? };
|
||||
|
||||
// read first pos and records count
|
||||
|
Reference in New Issue
Block a user