Compare commits

...

4 Commits

Author SHA1 Message Date
8120a996a3
add test_share_reader
All checks were successful
continuous-integration/drone/push Build is passing
2023-03-28 15:43:08 +03:00
3a26a4aa7f
upd bench 2023-03-28 15:37:21 +03:00
103b677d21
into_iter() for reader 2023-03-28 15:01:14 +03:00
e18539a982
reader without mut 2023-03-28 14:59:44 +03:00
3 changed files with 172 additions and 12 deletions

View File

@ -79,9 +79,8 @@ fn db_read(c: &mut Criterion) {
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader");
reader reader
}, },
|mut reader| { |reader| {
let mut reader_iter = reader.iter(); for item in reader {
while let Some(item) = reader_iter.next() {
drop(item); drop(item);
} }
}, },

View File

@ -232,7 +232,7 @@ fn main() {
} }
fn read_from_db2(id: u32) -> Option<Question> { fn read_from_db2(id: u32) -> Option<Question> {
let mut reader: db::Reader<Question> = let reader: db::Reader<Question> =
db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader"); db::Reader::new(NEW_DB_FILENAME, 2048).expect("new db reader");
let mut questions = reader.iter(); let mut questions = reader.iter();

View File

@ -3,6 +3,7 @@ use std::{
io::{self, Cursor, Read, Write}, io::{self, Cursor, Read, Write},
marker::PhantomData, marker::PhantomData,
path::Path, path::Path,
sync::Arc
}; };
use memmap::{Mmap, MmapOptions}; use memmap::{Mmap, MmapOptions};
@ -157,7 +158,7 @@ where
mmap: Mmap, mmap: Mmap,
count: usize, count: usize,
first_pos: LSize, first_pos: LSize,
_t: PhantomData<*const T>, _t: Option<Arc<T>> // PhantomData replacement
} }
impl<T> Reader<T> impl<T> Reader<T>
@ -178,7 +179,7 @@ where
mmap, mmap,
count, count,
first_pos, first_pos,
_t: PhantomData, _t: None
}) })
} }
@ -190,7 +191,7 @@ where
0 == self.len() 0 == self.len()
} }
pub fn get(&mut self, index: usize) -> Result<T, String> { pub fn get(&self, index: usize) -> Result<T, String> {
if index >= self.len() { if index >= self.len() {
return Err("index out of range".into()); return Err("index out of range".into());
} }
@ -222,16 +223,20 @@ where
Ok(item.0) Ok(item.0)
} }
pub fn iter(&mut self) -> ReaderIter<'_, T> { pub fn iter(&self) -> ReaderIter<'_, T> {
ReaderIter::new(self) ReaderIter::new(self)
} }
pub fn into_iter(self) -> ReaderIntoIter<T> {
ReaderIntoIter::new(self)
}
} }
pub struct ReaderIter<'a, T> pub struct ReaderIter<'a, T>
where where
T: bincode::Decode, T: bincode::Decode,
{ {
reader: &'a mut Reader<T>, reader: &'a Reader<T>,
index: Option<usize>, index: Option<usize>,
} }
@ -239,7 +244,7 @@ impl<'a, T> ReaderIter<'a, T>
where where
T: bincode::Decode, T: bincode::Decode,
{ {
fn new(reader: &'a mut Reader<T>) -> Self { fn new(reader: &'a Reader<T>) -> Self {
ReaderIter { ReaderIter {
reader, reader,
index: None, index: None,
@ -309,6 +314,100 @@ where
} }
} }
pub struct ReaderIntoIter<T>
where
T: bincode::Decode,
{
reader: Reader<T>,
index: Option<usize>,
}
impl<T> ReaderIntoIter<T>
where
T: bincode::Decode,
{
fn new(reader: Reader<T>) -> Self {
Self {
reader,
index: None,
}
}
}
impl<T> Iterator for ReaderIntoIter<T>
where
T: bincode::Decode,
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.index.is_none() && !self.reader.is_empty() {
self.index = Some(0);
}
match self.index {
Some(i) if i < self.reader.len() => self.nth(i),
_ => None,
}
}
fn nth(&mut self, n: usize) -> Option<Self::Item> {
if self.reader.len() <= n {
return None;
}
self.index = Some(n + 1);
let item = self.reader.get(n);
match item {
Ok(item) => Some(item),
Err(_) => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.reader.len();
if self.index.is_none() {
return (len, Some(len));
}
let index = self.index.unwrap();
let rem = if len > index + 1 {
len - (index + 1)
} else {
0
};
(rem, Some(rem))
}
fn count(self) -> usize
where
Self: Sized,
{
self.reader.len()
}
}
impl<T> ExactSizeIterator for ReaderIntoIter<T>
where
T: bincode::Decode,
{
fn len(&self) -> usize {
self.reader.len()
}
}
impl<T> IntoIterator for Reader<T>
where
T: bincode::Decode,
{
type Item = T;
type IntoIter = ReaderIntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.into_iter()
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -345,7 +444,7 @@ mod test {
writer.load(&mut items.clone().into_iter()).expect("load"); writer.load(&mut items.clone().into_iter()).expect("load");
writer.finish().expect("finish write"); writer.finish().expect("finish write");
let mut reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
for (idx, item) in items.iter().enumerate() { for (idx, item) in items.iter().enumerate() {
@ -372,11 +471,73 @@ mod test {
writer.load(&mut items.clone().into_iter()).expect("load"); writer.load(&mut items.clone().into_iter()).expect("load");
writer.finish().expect("finish write"); writer.finish().expect("finish write");
let mut reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader"); let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader");
assert_eq!(items.len(), reader.len()); assert_eq!(items.len(), reader.len());
items.into_iter().zip(reader.iter()).for_each(|pair| { items.into_iter().zip(reader.iter()).for_each(|pair| {
assert_eq!(pair.0, pair.1); assert_eq!(pair.0, pair.1);
}); });
} }
#[test]
fn test_write_read_into_iter() {
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
compress_lvl: 1,
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
current_buf_size: 4096,
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
let items_iter = gen_data(10);
let items: Vec<TestData> = items_iter.collect();
writer.load(&mut items.clone().into_iter()).expect("load");
writer.finish().expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader");
assert_eq!(items.len(), reader.len());
items.into_iter().zip(reader).for_each(|pair| {
assert_eq!(pair.0, pair.1);
});
}
/// sharing Reader instance between threads
#[test]
fn test_share_reader() {
use std::thread;
let dir = tempdir().expect("tempdir");
let tmpfile = dir.path().join("test.tmp");
let opts = WriterOpts {
compress_lvl: 1,
data_buf_size: 10 * 1024 * 1024,
out_buf_size: 10 * 1024 * 1024,
current_buf_size: 4096,
};
let mut writer: Writer<TestData> = Writer::new(&tmpfile, opts).expect("new writer");
let items_iter = gen_data(10);
let items: Vec<TestData> = items_iter.collect();
writer.load(&mut items.clone().into_iter()).expect("load");
writer.finish().expect("finish write");
let reader: Reader<TestData> = Reader::new(&tmpfile, 2048).expect("new reader");
assert_eq!(items.len(), reader.len());
let reader = Arc::new(reader);
for _ in 0..=3 {
let cur_items = items.clone();
let cur_reader = Arc::clone(&reader);
thread::spawn(move || {
cur_items.into_iter().zip(cur_reader.iter()).for_each(|pair| {
assert_eq!(pair.0, pair.1);
});
});
}
}
} }