Skip to content

Commit 1fd115c

Browse files
committed
Extract Delta Lake deletion vectors
This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation. Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit. Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion.
1 parent fc1ba78 commit 1fd115c

File tree

8 files changed

+476
-57
lines changed

8 files changed

+476
-57
lines changed

xtable-api/src/main/java/org/apache/xtable/model/TableChange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* @since 0.1
3131
*/
3232
@Value
33-
@Builder(toBuilder = true)
33+
@Builder(toBuilder = true, builderClassName = "Builder")
3434
public class TableChange {
3535
// Change in files at the specified instant
3636
InternalFilesDiff filesDiff;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.model.storage;
20+
21+
import java.util.Iterator;
22+
import java.util.function.Supplier;
23+
24+
import lombok.AccessLevel;
25+
import lombok.EqualsAndHashCode;
26+
import lombok.Getter;
27+
import lombok.NonNull;
28+
import lombok.ToString;
29+
import lombok.experimental.Accessors;
30+
import lombok.experimental.FieldDefaults;
31+
import lombok.experimental.SuperBuilder;
32+
33+
@Accessors(fluent = true)
34+
@SuperBuilder(toBuilder = true)
35+
@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
36+
@Getter
37+
@ToString(callSuper = true)
38+
@EqualsAndHashCode(callSuper = true)
39+
public class InternalDeletionVector extends InternalFile {
40+
// path (absolute with scheme) of data file to which this deletion vector belongs
41+
@NonNull String dataFilePath;
42+
43+
// super.getFileSizeBytes() is the size of the deletion vector file
44+
// super.getPhysicalPath() is the absolute path (with scheme) of the deletion vector file
45+
// super.getRecordCount() is the count of records in the deletion vector file
46+
47+
// offset of deletion vector start in a deletion vector file
48+
int offset;
49+
50+
/**
51+
* binary representation of the deletion vector. The consumer can use the {@link
52+
* #ordinalsIterator()} to extract the ordinals represented in the binary format.
53+
*/
54+
byte[] binaryRepresentation;
55+
56+
/**
57+
* Supplier for an iterator that returns the ordinals of records deleted by this deletion vector
58+
* in the linked data file, identified by {@link #dataFilePath}.
59+
*
60+
* <p>The {@link InternalDeletionVector} instance does not guarantee that a new or distinct result
61+
* will be returned each time the supplier is invoked. However, the supplier is expected to return
62+
* a new iterator for each call.
63+
*/
64+
@Getter(AccessLevel.NONE)
65+
Supplier<Iterator<Long>> ordinalsSupplier;
66+
67+
/**
68+
* @return An iterator that returns the ordinals of records deleted by this deletion vector in the
69+
* linked data file. There is no guarantee that a new or distinct iterator will be returned
70+
* each time the iterator is invoked.
71+
*/
72+
public Iterator<Long> ordinalsIterator() {
73+
return ordinalsSupplier.get();
74+
}
75+
}

xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,26 @@
1818

1919
package org.apache.xtable.delta;
2020

21+
import java.util.Arrays;
2122
import java.util.Collections;
23+
import java.util.Iterator;
2224
import java.util.List;
2325

2426
import lombok.AccessLevel;
2527
import lombok.NoArgsConstructor;
2628

29+
import org.apache.hadoop.conf.Configuration;
2730
import org.apache.hadoop.fs.Path;
2831

2932
import org.apache.spark.sql.delta.Snapshot;
3033
import org.apache.spark.sql.delta.actions.AddFile;
3134
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
3235
import org.apache.spark.sql.delta.actions.RemoveFile;
36+
import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
37+
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore;
38+
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore;
39+
40+
import com.google.common.annotations.VisibleForTesting;
3341

3442
import org.apache.xtable.exception.NotSupportedException;
3543
import org.apache.xtable.model.schema.InternalField;
@@ -38,6 +46,7 @@
3846
import org.apache.xtable.model.stat.FileStats;
3947
import org.apache.xtable.model.storage.FileFormat;
4048
import org.apache.xtable.model.storage.InternalDataFile;
49+
import org.apache.xtable.model.storage.InternalDeletionVector;
4150

4251
@NoArgsConstructor(access = AccessLevel.PRIVATE)
4352
public class DeltaActionsConverter {
@@ -113,16 +122,66 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
113122
*
114123
* @param snapshot the commit snapshot
115124
* @param addFile the add file action
116-
* @return the deletion vector representation (path of data file), or null if no deletion vector
117-
* is present
125+
* @return the deletion vector representation, or null if no deletion vector is present
118126
*/
119-
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
127+
public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) {
120128
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
121129
if (deletionVector == null) {
122130
return null;
123131
}
124132

125133
String dataFilePath = addFile.path();
126-
return getFullPathToFile(snapshot, dataFilePath);
134+
dataFilePath = getFullPathToFile(snapshot, dataFilePath);
135+
136+
InternalDeletionVector.InternalDeletionVectorBuilder<?, ?> deleteVectorBuilder =
137+
InternalDeletionVector.builder()
138+
.recordCount(deletionVector.cardinality())
139+
.fileSizeBytes(deletionVector.sizeInBytes())
140+
.dataFilePath(dataFilePath);
141+
142+
if (deletionVector.isInline()) {
143+
deleteVectorBuilder
144+
.binaryRepresentation(deletionVector.inlineData())
145+
.physicalPath("")
146+
.ordinalsSupplier(() -> ordinalsIterator(deletionVector.inlineData()));
147+
} else {
148+
Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath());
149+
deleteVectorBuilder
150+
.offset(getOffset(deletionVector))
151+
.physicalPath(deletionVectorFilePath.toString())
152+
.ordinalsSupplier(() -> ordinalsIterator(snapshot, deletionVector));
153+
}
154+
155+
return deleteVectorBuilder.build();
156+
}
157+
158+
private Iterator<Long> ordinalsIterator(byte[] bytes) {
159+
RoaringBitmapArray rbm = RoaringBitmapArray.readFrom(bytes);
160+
long[] ordinals = rbm.values();
161+
return Arrays.stream(ordinals).iterator();
162+
}
163+
164+
private Iterator<Long> ordinalsIterator(
165+
Snapshot snapshot, DeletionVectorDescriptor deleteVector) {
166+
Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath());
167+
int offset = getOffset(deleteVector);
168+
long[] ordinals =
169+
parseOrdinalFile(
170+
snapshot.deltaLog().newDeltaHadoopConf(),
171+
deletionVectorFilePath,
172+
deleteVector.sizeInBytes(),
173+
offset);
174+
return Arrays.stream(ordinals).iterator();
175+
}
176+
177+
private static int getOffset(DeletionVectorDescriptor deleteVector) {
178+
return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1;
179+
}
180+
181+
@VisibleForTesting
182+
long[] parseOrdinalFile(Configuration conf, Path filePath, int size, int offset) {
183+
DeletionVectorStore dvStore = new HadoopFileSystemDVStore(conf);
184+
RoaringBitmapArray rbm = dvStore.read(filePath, offset, size);
185+
return rbm.values();
127186
}
128187
}

xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import java.time.Instant;
2323
import java.util.ArrayList;
2424
import java.util.HashMap;
25-
import java.util.HashSet;
2625
import java.util.List;
2726
import java.util.Map;
2827
import java.util.Optional;
29-
import java.util.Set;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
3030

3131
import lombok.Builder;
3232
import lombok.extern.log4j.Log4j2;
@@ -53,6 +53,8 @@
5353
import org.apache.xtable.model.schema.InternalSchema;
5454
import org.apache.xtable.model.storage.FileFormat;
5555
import org.apache.xtable.model.storage.InternalDataFile;
56+
import org.apache.xtable.model.storage.InternalDeletionVector;
57+
import org.apache.xtable.model.storage.InternalFile;
5658
import org.apache.xtable.model.storage.InternalFilesDiff;
5759
import org.apache.xtable.model.storage.PartitionFileGroup;
5860
import org.apache.xtable.spi.extractor.ConversionSource;
@@ -113,8 +115,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
113115
// All 3 of the following data structures use data file's absolute path as the key
114116
Map<String, InternalDataFile> addedFiles = new HashMap<>();
115117
Map<String, InternalDataFile> removedFiles = new HashMap<>();
116-
// Set of data file paths for which deletion vectors exists.
117-
Set<String> deletionVectors = new HashSet<>();
118+
// Map of data file paths for which deletion vectors exists.
119+
Map<String, InternalDeletionVector> deletionVectors = new HashMap<>();
118120

119121
for (Action action : actionsForVersion) {
120122
if (action instanceof AddFile) {
@@ -129,10 +131,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
129131
DeltaPartitionExtractor.getInstance(),
130132
DeltaStatsExtractor.getInstance());
131133
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
132-
String deleteVectorPath =
133-
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
134-
if (deleteVectorPath != null) {
135-
deletionVectors.add(deleteVectorPath);
134+
InternalDeletionVector deletionVector =
135+
actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action);
136+
if (deletionVector != null) {
137+
deletionVectors.put(deletionVector.dataFilePath(), deletionVector);
136138
}
137139
} else if (action instanceof RemoveFile) {
138140
InternalDataFile dataFile =
@@ -151,7 +153,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
151153
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
152154
// same data file is removed and added, we need to remove it from the added and removed file
153155
// maps which are used to track actual added and removed data files.
154-
for (String deletionVector : deletionVectors) {
156+
for (String deletionVector : deletionVectors.keySet()) {
155157
// validate that a Remove action is also added for the data file
156158
if (removedFiles.containsKey(deletionVector)) {
157159
addedFiles.remove(deletionVector);
@@ -163,11 +165,15 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
163165
}
164166
}
165167

168+
List<InternalFile> allAddedFiles =
169+
Stream.concat(addedFiles.values().stream(), deletionVectors.values().stream())
170+
.collect(Collectors.toList());
166171
InternalFilesDiff internalFilesDiff =
167172
InternalFilesDiff.builder()
168-
.filesAdded(addedFiles.values())
173+
.filesAdded(allAddedFiles)
169174
.filesRemoved(removedFiles.values())
170175
.build();
176+
171177
return TableChange.builder()
172178
.tableAsOfChange(tableAtVersion)
173179
.filesDiff(internalFilesDiff)

xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.spark.sql.functions;
4040

4141
import org.apache.spark.sql.delta.DeltaLog;
42+
import org.apache.spark.sql.delta.actions.AddFile;
4243

4344
import com.google.common.base.Preconditions;
4445

@@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException {
212213
}
213214

214215
public List<String> getAllActiveFiles() {
215-
return deltaLog.snapshot().allFiles().collectAsList().stream()
216+
return getAllActiveFilesInfo().stream()
216217
.map(addFile -> addSlashToBasePath(basePath) + addFile.path())
217218
.collect(Collectors.toList());
218219
}
219220

221+
public List<AddFile> getAllActiveFilesInfo() {
222+
return deltaLog.snapshot().allFiles().collectAsList();
223+
}
224+
220225
private String addSlashToBasePath(String basePath) {
221226
if (basePath.endsWith("/")) {
222227
return basePath;

xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.xtable.model.InternalSnapshot;
3131
import org.apache.xtable.model.TableChange;
3232
import org.apache.xtable.model.storage.InternalDataFile;
33+
import org.apache.xtable.model.storage.InternalFile;
3334

3435
public class ValidationTestHelper {
3536

@@ -96,7 +97,7 @@ public static List<String> getAllFilePaths(InternalSnapshot internalSnapshot) {
9697
}
9798

9899
private static Set<String> extractPathsFromDataFile(Set<InternalDataFile> dataFiles) {
99-
return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
100+
return dataFiles.stream().map(InternalFile::getPhysicalPath).collect(Collectors.toSet());
100101
}
101102

102103
private static void replaceFileScheme(List<String> filePaths) {

0 commit comments

Comments
 (0)