Skip to content

Commit f2a65d1

Browse files
authored
feat: Implement Index Manifest (#76)
1 parent 357bdb4 commit f2a65d1

9 files changed

+318
-23
lines changed

crates/paimon/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ typed-builder = "^0.19"
5050
opendal = { version = "0.49", features = ["services-fs"] }
5151
pretty_assertions = "1"
5252
apache-avro = { version = "0.17", features = ["snappy"] }
53+
indexmap = "2.5.0"
5354

5455
[dev-dependencies]
5556
rand = "0.8.5"
57+
serde_avro_fast = { version = "1.1.2", features = ["snappy"] }

crates/paimon/src/spec/data_file.rs

-12
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,6 @@ impl BinaryRow {
5050
}
5151
}
5252

53-
/// The Source of a file.
54-
/// TODO: move me to the manifest module.
55-
///
56-
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java>
57-
#[repr(u8)]
58-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
59-
#[serde(rename_all = "camelCase")]
60-
pub enum FileSource {
61-
Append = 0,
62-
Compact = 1,
63-
}
64-
6553
/// Metadata of a data file.
6654
///
6755
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java>
+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use serde::{Deserialize, Serialize};
19+
use std::fmt::{Display, Formatter};
20+
21+
use indexmap::IndexMap;
22+
23+
/// Metadata of index file.
24+
///
25+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java>
26+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
27+
pub struct IndexFileMeta {
28+
#[serde(rename = "_INDEX_TYPE")]
29+
pub index_type: String,
30+
31+
#[serde(rename = "_FILE_NAME")]
32+
pub file_name: String,
33+
34+
#[serde(rename = "_FILE_SIZE")]
35+
pub file_size: i32,
36+
37+
#[serde(rename = "_ROW_COUNT")]
38+
pub row_count: i32,
39+
40+
// use Indexmap to ensure the order of deletion_vectors_ranges is consistent.
41+
#[serde(
42+
default,
43+
with = "map_serde",
44+
rename = "_DELETIONS_VECTORS_RANGES",
45+
alias = "_DELETION_VECTORS_RANGES"
46+
)]
47+
pub deletion_vectors_ranges: Option<IndexMap<String, (i32, i32)>>,
48+
}
49+
50+
impl Display for IndexFileMeta {
51+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
52+
write!(
53+
f,
54+
"IndexFileMeta{{index_type={}, fileName={}, fileSize={}, rowCount={}, deletion_vectors_ranges={:?}}}",
55+
self.index_type,
56+
self.file_name,
57+
self.file_size,
58+
self.row_count,
59+
self.deletion_vectors_ranges,
60+
)
61+
}
62+
}
63+
64+
mod map_serde {
65+
use indexmap::IndexMap;
66+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
67+
68+
#[derive(Deserialize, Serialize)]
69+
struct Temp {
70+
f0: String,
71+
f1: i32,
72+
f2: i32,
73+
}
74+
75+
pub fn serialize<S>(
76+
date: &Option<IndexMap<String, (i32, i32)>>,
77+
s: S,
78+
) -> Result<S::Ok, S::Error>
79+
where
80+
S: Serializer,
81+
{
82+
match *date {
83+
None => s.serialize_none(),
84+
Some(ref d) => s.collect_seq(d.iter().map(|(s, (i1, i2))| Temp {
85+
f0: s.into(),
86+
f1: *i1,
87+
f2: *i2,
88+
})),
89+
}
90+
}
91+
92+
#[allow(clippy::type_complexity)]
93+
pub fn deserialize<'de, D>(
94+
deserializer: D,
95+
) -> Result<Option<IndexMap<String, (i32, i32)>>, D::Error>
96+
where
97+
D: Deserializer<'de>,
98+
{
99+
match Option::deserialize(deserializer)? {
100+
None => Ok(None),
101+
Some::<Vec<Temp>>(s) => Ok(Some(
102+
s.into_iter()
103+
.map(|t| (t.f0, (t.f1, t.f2)))
104+
.collect::<IndexMap<_, _>>(),
105+
)),
106+
}
107+
}
108+
}
+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::spec::manifest_common::FileKind;
19+
use crate::spec::IndexFileMeta;
20+
use serde::{Deserialize, Serialize};
21+
use std::fmt::{Display, Formatter};
22+
23+
/// Manifest entry for index file.
24+
///
25+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java>
26+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
27+
pub struct IndexManifestEntry {
28+
#[serde(rename = "_KIND")]
29+
pub kind: FileKind,
30+
31+
#[serde(rename = "_PARTITION", with = "serde_bytes")]
32+
pub partition: Vec<u8>,
33+
34+
#[serde(rename = "_BUCKET")]
35+
pub bucket: i32,
36+
37+
#[serde(flatten)]
38+
pub index_file: IndexFileMeta,
39+
40+
#[serde(rename = "_VERSION")]
41+
pub version: i32,
42+
}
43+
44+
impl Display for IndexManifestEntry {
45+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46+
write!(
47+
f,
48+
"IndexManifestEntry{{kind={:?}, partition={:?}, bucket={}, index_file={}}}",
49+
self.kind, self.partition, self.bucket, self.index_file,
50+
)
51+
}
52+
}
53+
54+
#[cfg(test)]
55+
mod tests {
56+
use indexmap::IndexMap;
57+
58+
use super::*;
59+
60+
#[test]
61+
fn test_read_index_manifest_file() {
62+
let workdir =
63+
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}"));
64+
let path = workdir
65+
.join("tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0");
66+
let source = std::fs::read(path.to_str().unwrap()).unwrap();
67+
let mut reader =
68+
serde_avro_fast::object_container_file_encoding::Reader::from_slice(source.as_slice())
69+
.unwrap();
70+
let res: Vec<_> = reader
71+
.deserialize::<IndexManifestEntry>()
72+
.collect::<Result<_, _>>()
73+
.unwrap();
74+
assert_eq!(
75+
res,
76+
vec![
77+
IndexManifestEntry {
78+
version: 1,
79+
kind: FileKind::Add,
80+
partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
81+
bucket: 0,
82+
index_file: IndexFileMeta {
83+
index_type: "HASH".into(),
84+
file_name: "index-a984b43a-c3fb-40b4-ad29-536343c239a6-0".into(),
85+
file_size: 16,
86+
row_count: 4,
87+
deletion_vectors_ranges: None,
88+
}
89+
},
90+
IndexManifestEntry {
91+
version: 1,
92+
kind: FileKind::Add,
93+
partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
94+
bucket: 0,
95+
index_file: IndexFileMeta {
96+
index_type: "DELETION_VECTORS".into(),
97+
file_name: "index-3f0986c5-4398-449b-be82-95f019d7a748-0".into(),
98+
file_size: 33,
99+
row_count: 1,
100+
deletion_vectors_ranges: Some(IndexMap::from([(
101+
"data-9b76122c-6bb5-4952-a946-b5bce29694a1-0.orc".into(),
102+
(1, 24)
103+
)])),
104+
}
105+
}
106+
]
107+
);
108+
}
109+
110+
#[test]
111+
fn test_single_object_serde() {
112+
let sample = IndexManifestEntry {
113+
version: 1,
114+
kind: FileKind::Delete,
115+
partition: vec![0, 1, 0, 2, 0, 3, 0, 4, 0, 5, 0, 6],
116+
bucket: 0,
117+
index_file: IndexFileMeta {
118+
index_type: "DELETION_VECTORS".into(),
119+
file_name: "test1".into(),
120+
file_size: 33,
121+
row_count: 1,
122+
deletion_vectors_ranges: Some(IndexMap::from([("test1".into(), (1, 24))])),
123+
},
124+
};
125+
126+
let schema: serde_avro_fast::Schema = r#"["null", {
127+
"type": "record",
128+
"name": "org.apache.paimon.avro.generated.record",
129+
"fields": [
130+
{"name": "_VERSION", "type": "int"},
131+
{"name": "_KIND", "type": "int"},
132+
{"name": "_PARTITION", "type": "bytes"},
133+
{"name": "_BUCKET", "type": "int"},
134+
{"name": "_INDEX_TYPE", "type": "string"},
135+
{"name": "_FILE_NAME", "type": "string"},
136+
{"name": "_FILE_SIZE", "type": "long"},
137+
{"name": "_ROW_COUNT", "type": "long"},
138+
{
139+
"default": null,
140+
"name": "_DELETIONS_VECTORS_RANGES",
141+
"type": ["null", {
142+
"type": "array",
143+
"items": ["null", {
144+
"type": "record",
145+
"name": "org.apache.paimon.avro.generated.record__DELETIONS_VECTORS_RANGES",
146+
"fields": [
147+
{"name": "f0", "type": "string"},
148+
{"name": "f1", "type": "int"},
149+
{"name": "f2", "type": "int"}
150+
]
151+
}]
152+
}]
153+
}
154+
]
155+
}]"#
156+
.parse().unwrap();
157+
158+
let serializer_config = &mut serde_avro_fast::ser::SerializerConfig::new(&schema);
159+
let encoded = serde_avro_fast::to_single_object_vec(&sample, serializer_config).unwrap();
160+
let decoded: IndexManifestEntry =
161+
serde_avro_fast::from_single_object_slice(encoded.as_slice(), &schema).unwrap();
162+
assert_eq!(sample, decoded);
163+
}
164+
}
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use serde_repr::{Deserialize_repr, Serialize_repr};
19+
20+
/// Kind of a file.
21+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileKind.java>
22+
#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)]
23+
#[repr(u8)]
24+
pub enum FileKind {
25+
Add = 0,
26+
Delete = 1,
27+
}
28+
29+
/// The Source of a file.
30+
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java>
31+
#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)]
32+
#[repr(u8)]
33+
pub enum FileSource {
34+
Append = 0,
35+
Compact = 1,
36+
}

crates/paimon/src/spec/manifest_entry.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::spec::manifest_common::FileKind;
1819
use crate::spec::DataFileMeta;
1920
use serde::Deserialize;
20-
use serde_repr::{Deserialize_repr, Serialize_repr};
2121
use serde_with::serde_derive::Serialize;
2222

2323
/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file.
@@ -31,15 +31,6 @@ pub struct Identifier {
3131
pub file_name: String,
3232
}
3333

34-
/// Kind of a file.
35-
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileKind.java>
36-
#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)]
37-
#[repr(u8)]
38-
pub enum FileKind {
39-
Add = 0,
40-
Delete = 1,
41-
}
42-
4334
/// Entry of a manifest file, representing an addition / deletion of a data file.
4435
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java>
4536
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]

crates/paimon/src/spec/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ pub use snapshot::*;
3434
mod manifest_file_meta;
3535
pub use manifest_file_meta::*;
3636

37+
mod index_file_meta;
38+
pub use index_file_meta::*;
39+
40+
mod index_manifest;
41+
mod manifest_common;
3742
mod manifest_entry;
3843
mod objects_file;
3944
mod stats;

crates/paimon/src/spec/objects_file.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T
3232

3333
#[cfg(test)]
3434
mod tests {
35-
use crate::spec::manifest_entry::{FileKind, ManifestEntry};
35+
use crate::spec::manifest_common::FileKind;
36+
use crate::spec::manifest_entry::ManifestEntry;
3637
use crate::spec::objects_file::from_avro_bytes;
3738
use crate::spec::stats::BinaryTableStats;
3839
use crate::spec::{DataFileMeta, ManifestFileMeta};

0 commit comments

Comments
 (0)