Skip to content

Commit 2070a4f

Browse files
authored
feat: Add optional prefetch hint for parsing Puffin Footer (#1207)
## Which issue does this PR close? - Closes #1198. ## What changes are included in this PR? Add prefetch hint for parsing puffin footer + change some docs ## Are these changes tested? Yes, unit tested
1 parent 1373d7f commit 2070a4f

File tree

1 file changed

+129
-13
lines changed

1 file changed

+129
-13
lines changed

crates/iceberg/src/puffin/metadata.rs

+129-13
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ impl Flag {
131131
}
132132

133133
/// Metadata about a puffin file.
134+
///
134135
/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata
135136
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
136137
pub struct FileMetadata {
@@ -144,18 +145,28 @@ impl FileMetadata {
144145
pub(crate) const MAGIC_LENGTH: u8 = 4;
145146
pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31];
146147

147-
// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below.
148-
//
149-
// Footer
150-
// |
151-
// -------------------------------------------------
152-
// | |
153-
// Magic FooterPayload FooterPayloadLength Flags Magic
154-
// | |
155-
// -----------------------------
156-
// |
157-
// FOOTER_STRUCT
158-
148+
/// We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer.
149+
/// The structure of the Footer specification is illustrated below:
150+
///
151+
/// ```text
152+
/// Footer
153+
/// ┌────────────────────┐
154+
/// │ Magic (4 bytes) │
155+
/// │ │
156+
/// ├────────────────────┤
157+
/// │ FooterPayload │
158+
/// │ (PAYLOAD_LENGTH) │
159+
/// ├────────────────────┤ ◀─┐
160+
/// │ FooterPayloadSize │ │
161+
/// │ (4 bytes) │ │
162+
/// ├────────────────────┤
163+
/// │ Flags (4 bytes) │ FOOTER_STRUCT
164+
/// │ │
165+
/// ├────────────────────┤ │
166+
/// │ Magic (4 bytes) │ │
167+
/// │ │ │
168+
/// └────────────────────┘ ◀─┘
169+
/// ```
159170
const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
160171
const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
161172
const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
@@ -166,6 +177,11 @@ impl FileMetadata {
166177
pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
167178
FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
168179

180+
/// Constructs new puffin `FileMetadata`
181+
pub fn new(blobs: Vec<BlobMetadata>, properties: HashMap<String, String>) -> Self {
182+
Self { blobs, properties }
183+
}
184+
169185
fn check_magic(bytes: &[u8]) -> Result<()> {
170186
if bytes == FileMetadata::MAGIC {
171187
Ok(())
@@ -285,9 +301,73 @@ impl FileMetadata {
285301

286302
let footer_payload_str =
287303
FileMetadata::extract_footer_payload_as_str(&footer_bytes, footer_payload_length)?;
304+
288305
FileMetadata::from_json_str(&footer_payload_str)
289306
}
290307

308+
/// Reads file_metadata in puffin file with a prefetch hint
309+
///
310+
/// `prefetch_hint` is used to try to fetch the entire footer in one read. If
311+
/// the entire footer isn't fetched in one read the function will call the regular
312+
/// read option.
313+
#[allow(dead_code)]
314+
pub(crate) async fn read_with_prefetch(
315+
input_file: &InputFile,
316+
prefetch_hint: u8,
317+
) -> Result<FileMetadata> {
318+
if prefetch_hint > 16 {
319+
let input_file_length = input_file.metadata().await?.size;
320+
let file_read = input_file.reader().await?;
321+
322+
// Hint cannot be larger than input file
323+
if prefetch_hint as u64 > input_file_length {
324+
return FileMetadata::read(input_file).await;
325+
}
326+
327+
// Read footer based on prefetchi hint
328+
let start = input_file_length - prefetch_hint as u64;
329+
let end = input_file_length;
330+
let footer_bytes = file_read.read(start..end).await?;
331+
332+
let payload_length_start =
333+
footer_bytes.len() - (FileMetadata::FOOTER_STRUCT_LENGTH as usize);
334+
let payload_length_end =
335+
payload_length_start + (FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH as usize);
336+
let payload_length_bytes = &footer_bytes[payload_length_start..payload_length_end];
337+
338+
let mut buf = [0; 4];
339+
buf.copy_from_slice(payload_length_bytes);
340+
let footer_payload_length = u32::from_le_bytes(buf);
341+
342+
// If the (footer payload length + FOOTER_STRUCT_LENGTH + MAGIC_LENGTH) is greater
343+
// than the fetched footer then you can have it read regularly from a read with no
344+
// prefetch while passing in the footer_payload_length.
345+
let footer_length = (footer_payload_length as usize)
346+
+ FileMetadata::FOOTER_STRUCT_LENGTH as usize
347+
+ FileMetadata::MAGIC_LENGTH as usize;
348+
if footer_length > prefetch_hint as usize {
349+
return FileMetadata::read(input_file).await;
350+
}
351+
352+
// Read footer bytes
353+
let footer_start = footer_bytes.len() - footer_length;
354+
let footer_end = footer_bytes.len();
355+
let footer_bytes = &footer_bytes[footer_start..footer_end];
356+
357+
let magic_length = FileMetadata::MAGIC_LENGTH as usize;
358+
// check first four bytes of footer
359+
FileMetadata::check_magic(&footer_bytes[..magic_length])?;
360+
// check last four bytes of footer
361+
FileMetadata::check_magic(&footer_bytes[footer_bytes.len() - magic_length..])?;
362+
363+
let footer_payload_str =
364+
FileMetadata::extract_footer_payload_as_str(footer_bytes, footer_payload_length)?;
365+
return FileMetadata::from_json_str(&footer_payload_str);
366+
}
367+
368+
FileMetadata::read(input_file).await
369+
}
370+
291371
#[inline]
292372
/// Metadata about blobs in file
293373
pub fn blobs(&self) -> &[BlobMetadata] {
@@ -800,6 +880,7 @@ mod tests {
800880
let temp_dir = TempDir::new().unwrap();
801881

802882
let input_file = input_file_with_payload(&temp_dir, r#""blobs" = []"#).await;
883+
803884
assert_eq!(
804885
FileMetadata::read(&input_file).await.unwrap_err().to_string(),
805886
"DataInvalid => Given string is not valid JSON, source: invalid type: string \"blobs\", expected struct FileMetadata at line 1 column 7",
@@ -809,21 +890,56 @@ mod tests {
809890
#[tokio::test]
810891
async fn test_read_file_metadata_of_uncompressed_empty_file() {
811892
let input_file = java_empty_uncompressed_input_file();
893+
812894
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
813895
assert_eq!(file_metadata, empty_footer_payload())
814896
}
815897

816898
#[tokio::test]
817899
async fn test_read_file_metadata_of_uncompressed_metric_data() {
818900
let input_file = java_uncompressed_metric_input_file();
901+
819902
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
820903
assert_eq!(file_metadata, uncompressed_metric_file_metadata())
821904
}
822905

823906
#[tokio::test]
824907
async fn test_read_file_metadata_of_zstd_compressed_metric_data() {
825908
let input_file = java_zstd_compressed_metric_input_file();
826-
let file_metadata = FileMetadata::read(&input_file).await.unwrap();
909+
910+
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
911+
.await
912+
.unwrap();
827913
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata())
828914
}
915+
916+
#[tokio::test]
917+
async fn test_read_file_metadata_of_empty_file_with_prefetching() {
918+
let input_file = java_empty_uncompressed_input_file();
919+
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
920+
.await
921+
.unwrap();
922+
923+
assert_eq!(file_metadata, empty_footer_payload());
924+
}
925+
926+
#[tokio::test]
927+
async fn test_read_file_metadata_of_uncompressed_metric_data_with_prefetching() {
928+
let input_file = java_uncompressed_metric_input_file();
929+
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
930+
.await
931+
.unwrap();
932+
933+
assert_eq!(file_metadata, uncompressed_metric_file_metadata());
934+
}
935+
936+
#[tokio::test]
937+
async fn test_read_file_metadata_of_zstd_compressed_metric_data_with_prefetching() {
938+
let input_file = java_zstd_compressed_metric_input_file();
939+
let file_metadata = FileMetadata::read_with_prefetch(&input_file, 64)
940+
.await
941+
.unwrap();
942+
943+
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
944+
}
829945
}

0 commit comments

Comments
 (0)