Skip to content

Commit 0914f7a

Browse files
authored
feat: add parquet writer (#176)
1 parent c7727e3 commit 0914f7a

File tree

9 files changed

+995
-48
lines changed

9 files changed

+995
-48
lines changed

crates/iceberg/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
7070
pretty_assertions = { workspace = true }
7171
tempfile = { workspace = true }
7272
tera = { workspace = true }
73-
tokio = { workspace = true }

crates/iceberg/src/io.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
5454
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
5555
use once_cell::sync::Lazy;
5656
use opendal::{Operator, Scheme};
57+
use tokio::io::AsyncWrite as TokioAsyncWrite;
5758
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
5859
use url::Url;
5960

@@ -244,9 +245,9 @@ impl InputFile {
244245
}
245246

246247
/// Trait for writing file.
247-
pub trait FileWrite: AsyncWrite {}
248+
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
248249

249-
impl<T> FileWrite for T where T: AsyncWrite {}
250+
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
250251

251252
/// Output file is used for writing to files..
252253
#[derive(Debug)]
@@ -282,8 +283,10 @@ impl OutputFile {
282283
}
283284

284285
/// Creates output file for writing.
285-
pub async fn writer(&self) -> Result<impl FileWrite> {
286-
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
286+
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
287+
Ok(Box::new(
288+
self.op.writer(&self.path[self.relative_path_pos..]).await?,
289+
))
287290
}
288291
}
289292

crates/iceberg/src/scan.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl FileScanTask {
211211
mod tests {
212212
use crate::io::{FileIO, OutputFile};
213213
use crate::spec::{
214-
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
214+
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
215215
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
216216
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
217217
};
@@ -314,14 +314,15 @@ mod tests {
314314
ManifestEntry::builder()
315315
.status(ManifestStatus::Added)
316316
.data_file(
317-
DataFile::builder()
317+
DataFileBuilder::default()
318318
.content(DataContentType::Data)
319319
.file_path(format!("{}/1.parquet", &self.table_location))
320320
.file_format(DataFileFormat::Parquet)
321321
.file_size_in_bytes(100)
322322
.record_count(1)
323323
.partition(Struct::from_iter([Some(Literal::long(100))]))
324-
.build(),
324+
.build()
325+
.unwrap(),
325326
)
326327
.build(),
327328
ManifestEntry::builder()
@@ -330,14 +331,15 @@ mod tests {
330331
.sequence_number(parent_snapshot.sequence_number())
331332
.file_sequence_number(parent_snapshot.sequence_number())
332333
.data_file(
333-
DataFile::builder()
334+
DataFileBuilder::default()
334335
.content(DataContentType::Data)
335336
.file_path(format!("{}/2.parquet", &self.table_location))
336337
.file_format(DataFileFormat::Parquet)
337338
.file_size_in_bytes(100)
338339
.record_count(1)
339340
.partition(Struct::from_iter([Some(Literal::long(200))]))
340-
.build(),
341+
.build()
342+
.unwrap(),
341343
)
342344
.build(),
343345
ManifestEntry::builder()
@@ -346,14 +348,15 @@ mod tests {
346348
.sequence_number(parent_snapshot.sequence_number())
347349
.file_sequence_number(parent_snapshot.sequence_number())
348350
.data_file(
349-
DataFile::builder()
351+
DataFileBuilder::default()
350352
.content(DataContentType::Data)
351353
.file_path(format!("{}/3.parquet", &self.table_location))
352354
.file_format(DataFileFormat::Parquet)
353355
.file_size_in_bytes(100)
354356
.record_count(1)
355357
.partition(Struct::from_iter([Some(Literal::long(300))]))
356-
.build(),
358+
.build()
359+
.unwrap(),
357360
)
358361
.build(),
359362
],

crates/iceberg/src/spec/manifest.rs

+17-17
Original file line numberDiff line numberDiff line change
@@ -932,34 +932,34 @@ impl TryFrom<i32> for ManifestStatus {
932932
}
933933

934934
/// Data file carries data file path, partition tuple, metrics, …
935-
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
935+
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
936936
pub struct DataFile {
937937
/// field id: 134
938938
///
939939
/// Type of content stored by the data file: data, equality deletes,
940940
/// or position deletes (all v1 files are data files)
941-
content: DataContentType,
941+
pub(crate) content: DataContentType,
942942
/// field id: 100
943943
///
944944
/// Full URI for the file with FS scheme
945-
file_path: String,
945+
pub(crate) file_path: String,
946946
/// field id: 101
947947
///
948948
/// String file format name, avro, orc or parquet
949-
file_format: DataFileFormat,
949+
pub(crate) file_format: DataFileFormat,
950950
/// field id: 102
951951
///
952952
/// Partition data tuple, schema based on the partition spec output using
953953
/// partition field ids for the struct field ids
954-
partition: Struct,
954+
pub(crate) partition: Struct,
955955
/// field id: 103
956956
///
957957
/// Number of records in this file
958-
record_count: u64,
958+
pub(crate) record_count: u64,
959959
/// field id: 104
960960
///
961961
/// Total file size in bytes
962-
file_size_in_bytes: u64,
962+
pub(crate) file_size_in_bytes: u64,
963963
/// field id: 108
964964
/// key field id: 117
965965
/// value field id: 118
@@ -968,29 +968,29 @@ pub struct DataFile {
968968
/// store the column. Does not include bytes necessary to read other
969969
/// columns, like footers. Leave null for row-oriented formats (Avro)
970970
#[builder(default)]
971-
column_sizes: HashMap<i32, u64>,
971+
pub(crate) column_sizes: HashMap<i32, u64>,
972972
/// field id: 109
973973
/// key field id: 119
974974
/// value field id: 120
975975
///
976976
/// Map from column id to number of values in the column (including null
977977
/// and NaN values)
978978
#[builder(default)]
979-
value_counts: HashMap<i32, u64>,
979+
pub(crate) value_counts: HashMap<i32, u64>,
980980
/// field id: 110
981981
/// key field id: 121
982982
/// value field id: 122
983983
///
984984
/// Map from column id to number of null values in the column
985985
#[builder(default)]
986-
null_value_counts: HashMap<i32, u64>,
986+
pub(crate) null_value_counts: HashMap<i32, u64>,
987987
/// field id: 137
988988
/// key field id: 138
989989
/// value field id: 139
990990
///
991991
/// Map from column id to number of NaN values in the column
992992
#[builder(default)]
993-
nan_value_counts: HashMap<i32, u64>,
993+
pub(crate) nan_value_counts: HashMap<i32, u64>,
994994
/// field id: 125
995995
/// key field id: 126
996996
/// value field id: 127
@@ -1003,7 +1003,7 @@ pub struct DataFile {
10031003
///
10041004
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
10051005
#[builder(default)]
1006-
lower_bounds: HashMap<i32, Literal>,
1006+
pub(crate) lower_bounds: HashMap<i32, Literal>,
10071007
/// field id: 128
10081008
/// key field id: 129
10091009
/// value field id: 130
@@ -1016,19 +1016,19 @@ pub struct DataFile {
10161016
///
10171017
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
10181018
#[builder(default)]
1019-
upper_bounds: HashMap<i32, Literal>,
1019+
pub(crate) upper_bounds: HashMap<i32, Literal>,
10201020
/// field id: 131
10211021
///
10221022
/// Implementation-specific key metadata for encryption
10231023
#[builder(default)]
1024-
key_metadata: Vec<u8>,
1024+
pub(crate) key_metadata: Vec<u8>,
10251025
/// field id: 132
10261026
/// element field id: 133
10271027
///
10281028
/// Split offsets for the data file. For example, all row group offsets
10291029
/// in a Parquet file. Must be sorted ascending
10301030
#[builder(default)]
1031-
split_offsets: Vec<i64>,
1031+
pub(crate) split_offsets: Vec<i64>,
10321032
/// field id: 135
10331033
/// element field id: 136
10341034
///
@@ -1037,7 +1037,7 @@ pub struct DataFile {
10371037
/// otherwise. Fields with ids listed in this column must be present
10381038
/// in the delete file
10391039
#[builder(default)]
1040-
equality_ids: Vec<i32>,
1040+
pub(crate) equality_ids: Vec<i32>,
10411041
/// field id: 140
10421042
///
10431043
/// ID representing sort order for this file.
@@ -1049,7 +1049,7 @@ pub struct DataFile {
10491049
/// order id to null. Readers must ignore sort order id for position
10501050
/// delete files.
10511051
#[builder(default, setter(strip_option))]
1052-
sort_order_id: Option<i32>,
1052+
pub(crate) sort_order_id: Option<i32>,
10531053
}
10541054

10551055
/// Type of content stored by the data file: data, equality deletes, or

crates/iceberg/src/spec/table_metadata.rs

+19-19
Original file line numberDiff line numberDiff line change
@@ -52,66 +52,66 @@ pub type TableMetadataRef = Arc<TableMetadata>;
5252
/// We check the validity of this data structure when constructing.
5353
pub struct TableMetadata {
5454
/// Integer Version for the format.
55-
format_version: FormatVersion,
55+
pub(crate) format_version: FormatVersion,
5656
/// A UUID that identifies the table
57-
table_uuid: Uuid,
57+
pub(crate) table_uuid: Uuid,
5858
/// Location tables base location
59-
location: String,
59+
pub(crate) location: String,
6060
/// The tables highest sequence number
61-
last_sequence_number: i64,
61+
pub(crate) last_sequence_number: i64,
6262
/// Timestamp in milliseconds from the unix epoch when the table was last updated.
63-
last_updated_ms: i64,
63+
pub(crate) last_updated_ms: i64,
6464
/// An integer; the highest assigned column ID for the table.
65-
last_column_id: i32,
65+
pub(crate) last_column_id: i32,
6666
/// A list of schemas, stored as objects with schema-id.
67-
schemas: HashMap<i32, SchemaRef>,
67+
pub(crate) schemas: HashMap<i32, SchemaRef>,
6868
/// ID of the table’s current schema.
69-
current_schema_id: i32,
69+
pub(crate) current_schema_id: i32,
7070
/// A list of partition specs, stored as full partition spec objects.
71-
partition_specs: HashMap<i32, PartitionSpecRef>,
71+
pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
7272
/// ID of the “current” spec that writers should use by default.
73-
default_spec_id: i32,
73+
pub(crate) default_spec_id: i32,
7474
/// An integer; the highest assigned partition field ID across all partition specs for the table.
75-
last_partition_id: i32,
75+
pub(crate) last_partition_id: i32,
7676
///A string to string map of table properties. This is used to control settings that
7777
/// affect reading and writing and is not intended to be used for arbitrary metadata.
7878
/// For example, commit.retry.num-retries is used to control the number of commit retries.
79-
properties: HashMap<String, String>,
79+
pub(crate) properties: HashMap<String, String>,
8080
/// long ID of the current table snapshot; must be the same as the current
8181
/// ID of the main branch in refs.
82-
current_snapshot_id: Option<i64>,
82+
pub(crate) current_snapshot_id: Option<i64>,
8383
///A list of valid snapshots. Valid snapshots are snapshots for which all
8484
/// data files exist in the file system. A data file must not be deleted
8585
/// from the file system until the last snapshot in which it was listed is
8686
/// garbage collected.
87-
snapshots: HashMap<i64, SnapshotRef>,
87+
pub(crate) snapshots: HashMap<i64, SnapshotRef>,
8888
/// A list (optional) of timestamp and snapshot ID pairs that encodes changes
8989
/// to the current snapshot for the table. Each time the current-snapshot-id
9090
/// is changed, a new entry should be added with the last-updated-ms
9191
/// and the new current-snapshot-id. When snapshots are expired from
9292
/// the list of valid snapshots, all entries before a snapshot that has
9393
/// expired should be removed.
94-
snapshot_log: Vec<SnapshotLog>,
94+
pub(crate) snapshot_log: Vec<SnapshotLog>,
9595

9696
/// A list (optional) of timestamp and metadata file location pairs
9797
/// that encodes changes to the previous metadata files for the table.
9898
/// Each time a new metadata file is created, a new entry of the
9999
/// previous metadata file location should be added to the list.
100100
/// Tables can be configured to remove oldest metadata log entries and
101101
/// keep a fixed-size log of the most recent entries after a commit.
102-
metadata_log: Vec<MetadataLog>,
102+
pub(crate) metadata_log: Vec<MetadataLog>,
103103

104104
/// A list of sort orders, stored as full sort order objects.
105-
sort_orders: HashMap<i64, SortOrderRef>,
105+
pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
106106
/// Default sort order id of the table. Note that this could be used by
107107
/// writers, but is not used when reading because reads use the specs
108108
/// stored in manifest files.
109-
default_sort_order_id: i64,
109+
pub(crate) default_sort_order_id: i64,
110110
///A map of snapshot references. The map keys are the unique snapshot reference
111111
/// names in the table, and the map values are snapshot reference objects.
112112
/// There is always a main branch reference pointing to the current-snapshot-id
113113
/// even if the refs map is null.
114-
refs: HashMap<String, SnapshotReference>,
114+
pub(crate) refs: HashMap<String, SnapshotReference>,
115115
}
116116

117117
impl TableMetadata {

0 commit comments

Comments
 (0)