Skip to content

New Iterators API (hidden) #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
// (found in the LICENSE-* files in the repository)

use crate::{
compaction::CompactionStrategy, config::TreeType, tree::inner::MemtableId, AnyTree, BlobTree,
Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, Tree, UserKey, UserValue,
compaction::CompactionStrategy, config::TreeType, iter_guard::IterGuardImpl,
tree::inner::MemtableId, AnyTree, BlobTree, Config, KvPair, Memtable, Segment, SegmentId,
SeqNo, Snapshot, Tree, UserKey, UserValue,
};
use enum_dispatch::enum_dispatch;
use std::{
Expand All @@ -18,6 +19,58 @@ pub type RangeItem = crate::Result<KvPair>;
#[allow(clippy::module_name_repetitions)]
#[enum_dispatch]
pub trait AbstractTree {
/// Returns an iterator that scans through the entire tree.
///
/// Avoid using this function, or limit it as otherwise it may scan a lot of items.
///
/// # Experimental
///
/// This API is experimental and will 100% be renamed.
///
/// https://github.com/fjall-rs/lsm-tree/issues/110
#[doc(hidden)]
fn guarded_iter(
&self,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_> {
self.guarded_range::<&[u8], _>(.., seqno, index)
}

/// Returns an iterator over a prefixed set of items.
///
/// Avoid using an empty prefix as it may scan a lot of items (unless limited).
///
/// # Experimental
///
/// This API is experimental and will 100% be renamed.
///
/// https://github.com/fjall-rs/lsm-tree/issues/110
#[doc(hidden)]
fn guarded_prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_>;

/// Returns an iterator over a range of items.
///
/// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
///
/// # Experimental
///
/// This API is experimental and will 100% be renamed.
///
/// https://github.com/fjall-rs/lsm-tree/issues/110
#[doc(hidden)]
fn guarded_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_>;

/// Gets the memory usage of all bloom filters in the tree.
fn bloom_filter_size(&self) -> usize;

Expand Down
60 changes: 60 additions & 0 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
coding::{Decode, Encode},
compaction::stream::CompactionStream,
file::BLOBS_FOLDER,
iter_guard::{IterGuard, IterGuardImpl},
r#abstract::{AbstractTree, RangeItem},
tree::inner::MemtableId,
value::InternalValue,
Expand Down Expand Up @@ -74,6 +75,37 @@ pub struct BlobTree {
pub pending_segments: Arc<AtomicUsize>,
}

pub struct Guard<'a>(
&'a ValueLog<MyCompressor>,
crate::Result<(UserKey, UserValue)>,
);

impl IterGuard for Guard<'_> {
fn key(self) -> crate::Result<UserKey> {
self.1.map(|(k, _)| k)
}

fn size(self) -> crate::Result<u32> {
use MaybeInlineValue::{Indirect, Inline};

let value = self.1?.1;
let mut cursor = Cursor::new(value);

Ok(match MaybeInlineValue::decode_from(&mut cursor)? {
// NOTE: We know LSM-tree values are 32 bits in length max
#[allow(clippy::cast_possible_truncation)]
Inline(bytes) => bytes.len() as u32,

// NOTE: No need to resolve vHandle, because the size is already stored
Indirect { size, .. } => size,
})
}

fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
resolve_value_handle(self.0, self.1)
}
}

impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
let path = &config.path;
Expand Down Expand Up @@ -230,6 +262,34 @@ impl BlobTree {
}

impl AbstractTree for BlobTree {
fn guarded_prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_> {
Box::new(
self.index
.0
.create_prefix(&prefix, seqno, index)
.map(move |kv| IterGuardImpl::Blob(Guard(&self.blobs, kv))),
)
}

fn guarded_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_> {
Box::new(
self.index
.0
.create_range(&range, seqno, index)
.map(move |kv| IterGuardImpl::Blob(Guard(&self.blobs, kv))),
)
}

fn blob_file_count(&self) -> usize {
self.blobs.segment_count()
}
Expand Down
47 changes: 47 additions & 0 deletions src/iter_guard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::{
blob_tree::Guard as BlobGuard, tree::Guard as StandardGuard, KvPair, UserKey, UserValue,
};
use enum_dispatch::enum_dispatch;

/// An iterator item
#[enum_dispatch]
pub trait IterGuard {
/// Accesses the key-value tuple.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn into_inner(self) -> crate::Result<KvPair>;

/// Accesses the key.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn key(self) -> crate::Result<UserKey>;

/// Returns the value size.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn size(self) -> crate::Result<u32>;

/// Accesses the value.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn value(self) -> crate::Result<UserValue>
where
Self: Sized,
{
self.into_inner().map(|(_, v)| v)
}
}

#[enum_dispatch(IterGuard)]
pub enum IterGuardImpl<'a> {
Standard(StandardGuard),
Blob(BlobGuard<'a>),
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ mod error;
#[doc(hidden)]
pub mod file;

mod iter_guard;

mod key;
mod key_range;

Expand Down Expand Up @@ -192,6 +194,7 @@ pub type KvPair = (UserKey, UserValue);

#[doc(hidden)]
pub use {
iter_guard::IterGuard as Guard,
merge::BoxedIterator,
segment::{block::checksum::Checksum, id::GlobalSegmentId, meta::SegmentId},
tree::inner::TreeId,
Expand Down
51 changes: 51 additions & 0 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
compaction::{stream::CompactionStream, CompactionStrategy},
config::Config,
descriptor_table::FileDescriptorTable,
iter_guard::{IterGuard, IterGuardImpl},
level_manifest::LevelManifest,
manifest::Manifest,
memtable::Memtable,
Expand Down Expand Up @@ -51,7 +52,49 @@ impl std::ops::Deref for Tree {
}
}

pub struct Guard(crate::Result<(UserKey, UserValue)>);

impl IterGuard for Guard {
fn key(self) -> crate::Result<UserKey> {
self.0.map(|(k, _)| k)
}

fn size(self) -> crate::Result<u32> {
// NOTE: We know LSM-tree values are 32 bits in length max
#[allow(clippy::cast_possible_truncation)]
self.into_inner().map(|(_, v)| v.len() as u32)
}

fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
self.0
}
}

impl AbstractTree for Tree {
fn guarded_prefix<K: AsRef<[u8]>>(
&self,
prefix: K,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_> {
Box::new(
self.create_prefix(&prefix, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
)
}

fn guarded_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
range: R,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> Box<dyn Iterator<Item = IterGuardImpl> + '_> {
Box::new(
self.create_range(&range, seqno, index)
.map(|kv| IterGuardImpl::Standard(Guard(kv))),
)
}

fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
}
Expand Down Expand Up @@ -391,6 +434,14 @@ impl AbstractTree for Tree {
}

impl Tree {
fn new_iter(
&self,
seqno: Option<SeqNo>,
index: Option<Arc<Memtable>>,
) -> impl Iterator<Item = impl IterGuard> {
self.iter(seqno, index).map(Guard)
}

/// Opens an LSM-tree in the given directory.
///
/// Will recover previous state if the folder was previously
Expand Down
19 changes: 19 additions & 0 deletions tests/experimental_blob_tree_guarded_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use lsm_tree::{AbstractTree, Config, Guard};
use test_log::test;

#[test]
fn experimental_blob_tree_guarded_size() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;

let tree = Config::new(folder).open_as_blob_tree()?;

tree.insert("a".as_bytes(), "abc", 0);
tree.insert("b".as_bytes(), "a".repeat(10_000), 0);

assert_eq!(
10_003u32,
tree.guarded_iter(None, None).flat_map(Guard::size).sum()
);

Ok(())
}
62 changes: 62 additions & 0 deletions tests/experimental_tree_guarded_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use lsm_tree::{AbstractTree, Config, Guard};
use test_log::test;

#[test]
fn experimental_tree_guarded_range() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;

let tree = Config::new(folder).open()?;

tree.insert("a".as_bytes(), nanoid::nanoid!().as_bytes(), 0);
tree.insert("f".as_bytes(), nanoid::nanoid!().as_bytes(), 1);
tree.insert("g".as_bytes(), nanoid::nanoid!().as_bytes(), 2);

tree.insert("a".as_bytes(), nanoid::nanoid!().as_bytes(), 3);
tree.insert("f".as_bytes(), nanoid::nanoid!().as_bytes(), 4);
tree.insert("g".as_bytes(), nanoid::nanoid!().as_bytes(), 5);

assert_eq!(
2,
tree.guarded_range("a"..="f", None, None)
.flat_map(Guard::key)
.count()
);
assert_eq!(
2,
tree.guarded_range("f"..="g", None, None)
.flat_map(Guard::key)
.count()
);

Ok(())
}

#[test]
fn experimental_blob_tree_guarded_range() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?;

let tree = Config::new(folder).open_as_blob_tree()?;

tree.insert("a".as_bytes(), nanoid::nanoid!().as_bytes(), 0);
tree.insert("f".as_bytes(), nanoid::nanoid!().as_bytes(), 1);
tree.insert("g".as_bytes(), nanoid::nanoid!().as_bytes(), 2);

tree.insert("a".as_bytes(), nanoid::nanoid!().as_bytes(), 3);
tree.insert("f".as_bytes(), nanoid::nanoid!().as_bytes(), 4);
tree.insert("g".as_bytes(), nanoid::nanoid!().as_bytes(), 5);

assert_eq!(
2,
tree.guarded_range("a"..="f", None, None)
.flat_map(Guard::key)
.count()
);
assert_eq!(
2,
tree.guarded_range("f"..="g", None, None)
.flat_map(Guard::key)
.count()
);

Ok(())
}