diff --git a/Cargo.lock b/Cargo.lock index b1ab51bd3..44d05e707 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,7 +414,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -1591,15 +1591,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -2096,7 +2087,7 @@ dependencies = [ "log", "recursive", "regex", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -2753,19 +2744,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generator" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" -dependencies = [ - "cfg-if", - "libc", - "log", - "rustversion", - "windows", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -2822,8 +2800,8 @@ dependencies = [ "aho-corasick", "bstr", "log", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", + "regex-automata", + "regex-syntax", ] [[package]] @@ -3193,7 +3171,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.52.0", + "windows-core", ] [[package]] @@ -3233,7 +3211,6 @@ dependencies = [ "iceberg-catalog-memory", "iceberg_test_utils", "itertools 0.13.0", - "moka", "murmur3", "num-bigint", "once_cell", @@ -3617,7 +3594,7 @@ dependencies = [ "globset", "log", "memchr", - "regex-automata 0.4.9", + "regex-automata", "same-file", "walkdir", "winapi-util", @@ -3993,19 +3970,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "loom" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" -dependencies = [ - "cfg-if", - "generator", - "scoped-tls", - "tracing", - "tracing-subscriber", -] - [[package]] name = "lz4_flex" version = "0.11.3" @@ -4026,15 +3990,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "md-5" version = "0.10.6" @@ -4141,28 +4096,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "moka" -version = "0.12.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" -dependencies = [ - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "event-listener 5.4.0", - "futures-util", - "loom", - "parking_lot", - "portable-atomic", - "rustc_version", - "smallvec", - "tagptr", - "thiserror 1.0.69", - "uuid", -] - [[package]] name = "motore" version = "0.4.1" @@ -5248,17 +5181,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -5269,7 +5193,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -5278,12 +5202,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -5728,12 +5646,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -6478,12 +6390,6 @@ dependencies = [ "syn 2.0.98", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tap" version = "1.0.1" @@ -6846,14 +6752,10 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ - "matchers", "nu-ansi-term", - "once_cell", - "regex", "sharded-slab", "smallvec", "thread_local", - "tracing", "tracing-core", "tracing-log", ] @@ -7360,16 +7262,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows" -version = "0.58.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" -dependencies = [ - "windows-core 0.58.0", - "windows-targets 0.52.6", -] - [[package]] name = "windows-core" version = "0.52.0" @@ -7379,41 +7271,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-core" -version = "0.58.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" -dependencies = [ - "windows-implement", - "windows-interface", - "windows-result", - "windows-strings", - "windows-targets 0.52.6", -] - -[[package]] -name = "windows-implement" -version = "0.58.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.98", -] - -[[package]] -name = "windows-interface" -version = "0.58.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.98", -] - [[package]] name = "windows-registry" version = "0.2.0" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 8795edc74..ad65dbfff 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -62,7 +62,6 @@ derive_builder = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } once_cell = { workspace = true } @@ -98,4 +97,4 @@ tera = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version -ignored = ["tap"] \ No newline at end of file +ignored = ["tap"] diff --git a/crates/iceberg/src/cache.rs b/crates/iceberg/src/cache.rs index 37fe07127..fa56a8a9d 100644 --- a/crates/iceberg/src/cache.rs +++ b/crates/iceberg/src/cache.rs @@ -17,6 +17,7 @@ //! Cache management for Iceberg. +use std::fmt::Debug; use std::sync::Arc; use crate::spec::{Manifest, ManifestList}; @@ -44,13 +45,23 @@ pub trait ObjectCache: Send + Sync { /// which contains `Schema`. Please ensure that the cache stores the /// object in memory as-is, without attempting to serialize it, as /// serialization could be extremely expensive. -pub trait ObjectCacheProvide: Send + Sync { +pub trait ObjectCacheProvide: Send + Sync + Debug { /// Gets a cache for manifests. fn manifest_cache(&self) -> &dyn ObjectCache>; /// Gets a cache for manifest lists. fn manifest_list_cache(&self) -> &dyn ObjectCache>; } +impl ObjectCacheProvide for T { + fn manifest_cache(&self) -> &dyn ObjectCache> { + (*self).manifest_cache() + } + + fn manifest_list_cache(&self) -> &dyn ObjectCache> { + (*self).manifest_list_cache() + } +} + /// CacheProvider is a type alias for a thread-safe reference-counted pointer to a CacheProvide trait object. pub type ObjectCacheProvider = Arc; diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index e94e48a45..23f7c62fa 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -162,7 +162,11 @@ impl<'a> ManifestsTable<'a> { if let Some(snapshot) = self.table.metadata().current_snapshot() { let manifest_list = snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + .load_manifest_list( + &self.table.metadata_ref(), + self.table.file_io(), + self.table.object_cache(), + ) .await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i32); diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 8e0638257..8e8fd8e4e 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -78,7 +78,6 @@ use storage_memory::*; mod storage_s3; #[cfg(feature = "storage-s3")] pub use storage_s3::*; -pub(crate) mod object_cache; #[cfg(feature = "storage-fs")] mod storage_fs; diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs deleted file mode 100644 index e40af3e02..000000000 --- a/crates/iceberg/src/io/object_cache.rs +++ /dev/null @@ -1,406 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::mem::size_of_val; -use std::sync::Arc; - -use crate::io::FileIO; -use crate::spec::{ - FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, -}; -use crate::{Error, ErrorKind, Result}; - -const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB - -#[derive(Clone, Debug)] -pub(crate) enum CachedItem { - ManifestList(Arc), - Manifest(Arc), -} - -#[derive(Clone, Debug, Hash, Eq, PartialEq)] -pub(crate) enum CachedObjectKey { - ManifestList((String, FormatVersion, SchemaId)), - Manifest(String), -} - -/// Caches metadata objects deserialized from immutable files -#[derive(Clone, Debug)] -pub struct ObjectCache { - cache: moka::future::Cache, - file_io: FileIO, - cache_disabled: bool, -} - -impl ObjectCache { - /// Creates a new [`ObjectCache`] - /// with the default cache size - pub(crate) fn new(file_io: FileIO) -> Self { - Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) - } - - /// Creates a new [`ObjectCache`] - /// with a specific cache size - pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { - if cache_size_bytes == 0 { - Self::with_disabled_cache(file_io) - } else { - Self { - cache: moka::future::Cache::builder() - .weigher(|_, val: &CachedItem| match val { - CachedItem::ManifestList(item) => size_of_val(item.as_ref()), - CachedItem::Manifest(item) => size_of_val(item.as_ref()), - } as u32) - .max_capacity(cache_size_bytes) - .build(), - file_io, - cache_disabled: false, - } - } - } - - /// Creates a new [`ObjectCache`] - /// with caching disabled - pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { - Self { - cache: moka::future::Cache::new(0), - file_io, - cache_disabled: true, - } - } - - /// Retrieves an Arc [`Manifest`] from the cache - /// or retrieves one from FileIO and parses it if not present - pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { - if self.cache_disabled { - return manifest_file - .load_manifest(&self.file_io) - .await - .map(Arc::new); - } - - let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone()); - - let cache_entry = self - .cache - .entry_by_ref(&key) - .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) - .await - .map_err(|err| { - Error::new( - ErrorKind::Unexpected, - format!("Failed to load manifest {}", manifest_file.manifest_path), - ) - .with_source(err) - })? - .into_value(); - - match cache_entry { - CachedItem::Manifest(arc_manifest) => Ok(arc_manifest), - _ => Err(Error::new( - ErrorKind::Unexpected, - format!("cached object for key '{:?}' is not a Manifest", key), - )), - } - } - - /// Retrieves an Arc [`ManifestList`] from the cache - /// or retrieves one from FileIO and parses it if not present - pub(crate) async fn get_manifest_list( - &self, - snapshot: &SnapshotRef, - table_metadata: &TableMetadataRef, - ) -> Result> { - if self.cache_disabled { - return snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await - .map(Arc::new); - } - - let key = CachedObjectKey::ManifestList(( - snapshot.manifest_list().to_string(), - table_metadata.format_version, - snapshot.schema_id().unwrap(), - )); - let cache_entry = self - .cache - .entry_by_ref(&key) - .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata)) - .await - .map_err(|err| { - Arc::try_unwrap(err).unwrap_or_else(|err| { - Error::new( - ErrorKind::Unexpected, - "Failed to load manifest list in cache", - ) - .with_source(err) - }) - })? - .into_value(); - - match cache_entry { - CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list), - _ => Err(Error::new( - ErrorKind::Unexpected, - format!("cached object for path '{:?}' is not a manifest list", key), - )), - } - } - - async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result { - let manifest = manifest_file.load_manifest(&self.file_io).await?; - - Ok(CachedItem::Manifest(Arc::new(manifest))) - } - - async fn fetch_and_parse_manifest_list( - &self, - snapshot: &SnapshotRef, - table_metadata: &TableMetadataRef, - ) -> Result { - let manifest_list = snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await?; - - Ok(CachedItem::ManifestList(Arc::new(manifest_list))) - } -} - -#[cfg(test)] -mod tests { - use std::fs; - - use tempfile::TempDir; - use tera::{Context, Tera}; - use uuid::Uuid; - - use super::*; - use crate::io::{FileIO, OutputFile}; - use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata, - }; - use crate::table::Table; - use crate::TableIdent; - - struct TableTestFixture { - table_location: String, - table: Table, - } - - impl TableTestFixture { - fn new() -> Self { - let tmp_dir = TempDir::new().unwrap(); - let table_location = tmp_dir.path().join("table1"); - let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); - let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); - let table_metadata1_location = table_location.join("metadata/v1.json"); - - let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) - .unwrap() - .build() - .unwrap(); - - let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); - let mut context = Context::new(); - context.insert("table_location", &table_location); - context.insert("manifest_list_1_location", &manifest_list1_location); - context.insert("manifest_list_2_location", &manifest_list2_location); - context.insert("table_metadata_1_location", &table_metadata1_location); - - let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); - serde_json::from_str::(&metadata_json).unwrap() - }; - - let table = Table::builder() - .metadata(table_metadata) - .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) - .file_io(file_io.clone()) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) - .build() - .unwrap(); - - Self { - table_location: table_location.to_str().unwrap().to_string(), - table, - } - } - - fn next_manifest_file(&self) -> OutputFile { - self.table - .file_io() - .new_output(format!( - "{}/metadata/manifest_{}.avro", - self.table_location, - Uuid::new_v4() - )) - .unwrap() - } - - async fn setup_manifest_files(&mut self) { - let current_snapshot = self.table.metadata().current_snapshot().unwrap(); - let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec(); - - // Write data files - let mut writer = ManifestWriterBuilder::new( - self.next_manifest_file(), - Some(current_snapshot.snapshot_id()), - vec![], - current_schema.clone(), - current_partition_spec.as_ref().clone(), - ) - .build_v2_data(); - writer - .add_entry( - ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFileBuilder::default() - .partition_spec_id(0) - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .build() - .unwrap(), - ) - .build(), - ) - .unwrap(); - let data_file_manifest = writer.write_manifest_file().await.unwrap(); - - // Write to manifest list - let mut manifest_list_write = ManifestListWriter::v2( - self.table - .file_io() - .new_output(current_snapshot.manifest_list()) - .unwrap(), - current_snapshot.snapshot_id(), - current_snapshot.parent_snapshot_id(), - current_snapshot.sequence_number(), - ); - manifest_list_write - .add_manifests(vec![data_file_manifest].into_iter()) - .unwrap(); - manifest_list_write.close().await.unwrap(); - } - } - - #[tokio::test] - async fn test_get_manifest_list_and_manifest_from_disabled_cache() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone()); - - let result_manifest_list = object_cache - .get_manifest_list( - fixture.table.metadata().current_snapshot().unwrap(), - &fixture.table.metadata_ref(), - ) - .await - .unwrap(); - - assert_eq!(result_manifest_list.entries().len(), 1); - - let manifest_file = result_manifest_list.entries().first().unwrap(); - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); - - assert_eq!( - result_manifest - .entries() - .first() - .unwrap() - .file_path() - .split("/") - .last() - .unwrap(), - "1.parquet" - ); - } - - #[tokio::test] - async fn test_get_manifest_list_and_manifest_from_default_cache() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; - - let object_cache = ObjectCache::new(fixture.table.file_io().clone()); - - // not in cache - let result_manifest_list = object_cache - .get_manifest_list( - fixture.table.metadata().current_snapshot().unwrap(), - &fixture.table.metadata_ref(), - ) - .await - .unwrap(); - - assert_eq!(result_manifest_list.entries().len(), 1); - - // retrieve cached version - let result_manifest_list = object_cache - .get_manifest_list( - fixture.table.metadata().current_snapshot().unwrap(), - &fixture.table.metadata_ref(), - ) - .await - .unwrap(); - - assert_eq!(result_manifest_list.entries().len(), 1); - - let manifest_file = result_manifest_list.entries().first().unwrap(); - - // not in cache - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); - - assert_eq!( - result_manifest - .entries() - .first() - .unwrap() - .file_path() - .split("/") - .last() - .unwrap(), - "1.parquet" - ); - - // retrieve cached version - let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap(); - - assert_eq!( - result_manifest - .entries() - .first() - .unwrap() - .file_path() - .split("/") - .last() - .unwrap(), - "1.parquet" - ); - } -} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 6bfb12b23..f36042c65 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -20,9 +20,10 @@ use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::{SinkExt, TryFutureExt}; +use crate::cache::ObjectCacheProvider; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; -use crate::io::object_cache::ObjectCache; +use crate::io::FileIO; use crate::scan::{ BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache, PartitionFilterCache, @@ -36,13 +37,15 @@ use crate::{Error, ErrorKind, Result}; /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { + file_io: FileIO, + object_cache: Option, + manifest_file: ManifestFile, sender: Sender, field_ids: Arc>, bound_predicates: Option>, - object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: Option, @@ -66,6 +69,7 @@ impl ManifestFileContext { /// streaming its constituent [`ManifestEntries`] to the channel provided in the context pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> { let ManifestFileContext { + file_io, object_cache, manifest_file, bound_predicates, @@ -77,7 +81,9 @@ impl ManifestFileContext { .. } = self; - let manifest = object_cache.get_manifest(&manifest_file).await?; + let manifest = manifest_file + .load_manifest(&file_io, object_cache.as_ref()) + .await?; for manifest_entry in manifest.entries() { let manifest_entry_context = ManifestEntryContext { @@ -140,6 +146,9 @@ impl ManifestEntryContext { /// objects that are required to perform a scan file plan. #[derive(Debug)] pub(crate) struct PlanContext { + pub file_io: FileIO, + pub object_cache: Option, + pub snapshot: SnapshotRef, pub table_metadata: TableMetadataRef, @@ -147,7 +156,6 @@ pub(crate) struct PlanContext { pub case_sensitive: bool, pub predicate: Option>, pub snapshot_bound_predicate: Option>, - pub object_cache: Arc, pub field_ids: Arc>, pub partition_filter_cache: Arc, @@ -157,9 +165,12 @@ pub(crate) struct PlanContext { impl PlanContext { pub(crate) async fn get_manifest_list(&self) -> Result> { - self.object_cache - .as_ref() - .get_manifest_list(&self.snapshot, &self.table_metadata) + self.snapshot + .load_manifest_list( + &self.table_metadata, + &self.file_io, + self.object_cache.as_ref(), + ) .await } @@ -262,10 +273,12 @@ impl PlanContext { }; ManifestFileContext { + file_io: self.file_io.clone(), + object_cache: self.object_cache.clone(), + manifest_file: manifest_file.clone(), bound_predicates, sender, - object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c1cedd58e..339faae13 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -294,13 +294,15 @@ impl<'a> TableScanBuilder<'a> { }; let plan_context = PlanContext { + file_io: self.table.file_io().clone(), + object_cache: self.table.object_cache().cloned(), + snapshot, table_metadata: self.table.metadata_ref(), snapshot_schema: schema, case_sensitive: self.case_sensitive, predicate: self.filter.map(Arc::new), snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), - object_cache: self.table.object_cache(), field_ids: Arc::new(field_ids), partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 8cf5df8dd..db24fe845 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use apache_avro::types::Value; use apache_avro::{from_value, Reader, Writer}; @@ -27,6 +28,7 @@ use bytes::Bytes; use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{Datum, FormatVersion, Manifest, StructType}; +use crate::cache::ObjectCacheProvider; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::{Error, ErrorKind}; @@ -656,7 +658,29 @@ impl ManifestFile { /// Load [`Manifest`]. /// /// This method will also initialize inherited values of [`ManifestEntry`], such as `sequence_number`. - pub async fn load_manifest(&self, file_io: &FileIO) -> Result { + /// + /// If `object_cache` is provided, it will be used to cache the manifest. + pub async fn load_manifest( + &self, + file_io: &FileIO, + object_cache: Option<&ObjectCacheProvider>, + ) -> Result> { + let Some(object_cache) = object_cache else { + return self.load_manifest_inner(file_io).await; + }; + + if let Some(manifest) = object_cache.manifest_cache().get(&self.manifest_path) { + return Ok(manifest); + } + + let manifest = self.load_manifest_inner(file_io).await?; + object_cache + .manifest_cache() + .set(self.manifest_path.clone(), manifest.clone()); + Ok(manifest) + } + + async fn load_manifest_inner(&self, file_io: &FileIO) -> Result> { let avro = file_io.new_input(&self.manifest_path)?.read().await?; let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?; @@ -666,7 +690,7 @@ impl ManifestFile { entry.inherit_data(self); } - Ok(Manifest::new(metadata, entries)) + Ok(Arc::new(Manifest::new(metadata, entries))) } } diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 922e7bab9..c53c63540 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; +use crate::cache::ObjectCacheProvider; use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; @@ -186,9 +187,35 @@ impl Snapshot { /// Load manifest list. pub async fn load_manifest_list( &self, + table_metadata: &TableMetadata, file_io: &FileIO, + object_cache: Option<&ObjectCacheProvider>, + ) -> Result> { + if let Some(cache) = object_cache { + if let Some(manifest_list) = cache.manifest_list_cache().get(&self.manifest_list) { + return Ok(manifest_list); + } + } + + let manifest_list = self + .load_manifest_list_inner(table_metadata, file_io) + .await?; + + if let Some(cache) = object_cache { + cache + .manifest_list_cache() + .set(self.manifest_list.clone(), manifest_list.clone()); + } + + Ok(manifest_list) + } + + /// Load manifest list. + async fn load_manifest_list_inner( + &self, table_metadata: &TableMetadata, - ) -> Result { + file_io: &FileIO, + ) -> Result> { let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?; let schema = self.schema(table_metadata)?; @@ -200,11 +227,12 @@ impl Snapshot { .transpose() }; - ManifestList::parse_with_version( + Ok(ManifestList::parse_with_version( &manifest_list_content, table_metadata.format_version(), partition_type_provider, - ) + )? + .into()) } #[allow(dead_code)] diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..88533dc3e 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -17,11 +17,11 @@ //! Table API for Apache Iceberg -use std::sync::Arc; +use std::fmt::Debug; use crate::arrow::ArrowReaderBuilder; +use crate::cache::ObjectCacheProvider; use crate::inspect::MetadataTable; -use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; @@ -30,24 +30,24 @@ use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. pub struct TableBuilder { file_io: Option, + object_cache: Option, + metadata_location: Option, metadata: Option, identifier: Option, readonly: bool, - disable_cache: bool, - cache_size_bytes: Option, } impl TableBuilder { pub(crate) fn new() -> Self { Self { file_io: None, + object_cache: None, + metadata_location: None, metadata: None, identifier: None, readonly: false, - disable_cache: false, - cache_size_bytes: None, } } @@ -57,6 +57,12 @@ impl TableBuilder { self } + /// optional - sets and enables the object cache for table. + pub fn object_cache(mut self, object_cache: ObjectCacheProvider) -> Self { + self.object_cache = Some(object_cache); + self + } + /// optional - sets the tables metadata location pub fn metadata_location>(mut self, metadata_location: T) -> Self { self.metadata_location = Some(metadata_location.into()); @@ -81,30 +87,15 @@ impl TableBuilder { self } - /// specifies if the Table's metadata cache will be disabled, - /// so that reads of Manifests and ManifestLists will never - /// get cached. - pub fn disable_cache(mut self) -> Self { - self.disable_cache = true; - self - } - - /// optionally set a non-default metadata cache size - pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self { - self.cache_size_bytes = Some(cache_size_bytes); - self - } - /// build the Table pub fn build(self) -> Result { let Self { file_io, + object_cache, metadata_location, metadata, identifier, readonly, - disable_cache, - cache_size_bytes, } = self; let Some(file_io) = file_io else { @@ -128,24 +119,14 @@ impl TableBuilder { )); }; - let object_cache = if disable_cache { - Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) - } else if let Some(cache_size_bytes) = cache_size_bytes { - Arc::new(ObjectCache::new_with_capacity( - file_io.clone(), - cache_size_bytes, - )) - } else { - Arc::new(ObjectCache::new(file_io.clone())) - }; - Ok(Table { file_io, + object_cache, + metadata_location, metadata, identifier, readonly, - object_cache, }) } } @@ -154,11 +135,12 @@ impl TableBuilder { #[derive(Debug, Clone)] pub struct Table { file_io: FileIO, + object_cache: Option, + metadata_location: Option, metadata: TableMetadataRef, identifier: TableIdent, readonly: bool, - object_cache: Arc, } impl Table { @@ -192,8 +174,8 @@ impl Table { } /// Returns this table's object cache - pub(crate) fn object_cache(&self) -> Arc { - self.object_cache.clone() + pub(crate) fn object_cache(&self) -> Option<&ObjectCacheProvider> { + self.object_cache.as_ref() } /// Creates a table scan. diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 8c7b36358..fdf15fd8e 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -74,7 +74,7 @@ impl<'a> FastAppendAction<'a> { /// Adds existing parquet files /// - /// Note: This API is not yet fully supported in version 0.5.0. + /// Note: This API is not yet fully supported in version 0.5.0. /// It is currently incomplete and should not be used in production. /// Specifically, schema compatibility checks and support for adding to partitioned tables /// have not yet been implemented. @@ -190,8 +190,9 @@ impl SnapshotProduceOperation for FastAppendOperation { let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.table.file_io(), &snapshot_produce.tx.table.metadata_ref(), + snapshot_produce.tx.table.file_io(), + snapshot_produce.tx.table.object_cache(), ) .await?; @@ -270,7 +271,7 @@ mod tests { unreachable!() }; let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) + .load_manifest_list(table.metadata(), table.file_io(), table.object_cache()) .await .unwrap(); assert_eq!(1, manifest_list.entries().len()); @@ -281,7 +282,7 @@ mod tests { // check manifset let manifest = manifest_list.entries()[0] - .load_manifest(table.file_io()) + .load_manifest(table.file_io(), table.object_cache()) .await .unwrap(); assert_eq!(1, manifest.entries().len()); @@ -347,7 +348,11 @@ mod tests { }; let manifest_list = new_snapshot - .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) + .load_manifest_list( + fixture.table.metadata(), + fixture.table.file_io(), + fixture.table.object_cache(), + ) .await .expect("Failed to load manifest list"); @@ -360,7 +365,7 @@ mod tests { // Load the manifest from the manifest list let manifest = manifest_list.entries()[0] - .load_manifest(fixture.table.file_io()) + .load_manifest(fixture.table.file_io(), fixture.table.object_cache()) .await .expect("Failed to load manifest");