From 1fd115c79304e251aec598f0ad269ed4dfb8d88a Mon Sep 17 00:00:00 2001 From: Ashvin Agrawal Date: Tue, 21 Jan 2025 21:00:09 -0800 Subject: [PATCH] Extract Delta Lake deletion vectors This change extracts deletion vectors represented as roaring bitmaps in delta lake files and converts them into the XTable intermediate representation. Previously, XTable only detected tables changes that included adding or removing of data files. Now the detected table change also includes any deletion vectors files added in the commit. Note that, in Delta Lake, the Deletion vectors are represented in a compressed binary format. However, once extracted by Xtable, the offset are currently extracted into a list of long offsets. This representation is not the most efficient for large datasets. Optimization is pending to prioritize end-to-end conversion completion. --- .../org/apache/xtable/model/TableChange.java | 2 +- .../model/storage/InternalDeletionVector.java | 75 +++++++ .../xtable/delta/DeltaActionsConverter.java | 67 ++++++- .../xtable/delta/DeltaConversionSource.java | 26 ++- .../apache/xtable/TestSparkDeltaTable.java | 7 +- .../apache/xtable/ValidationTestHelper.java | 3 +- .../delta/ITDeltaDeleteVectorConvert.java | 189 ++++++++++++++++-- .../delta/TestDeltaActionsConverter.java | 164 +++++++++++++-- 8 files changed, 476 insertions(+), 57 deletions(-) create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java index b425fd018..287b8f38a 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java @@ -30,7 +30,7 @@ * @since 0.1 */ @Value -@Builder(toBuilder = true) +@Builder(toBuilder = true, builderClassName = "Builder") public class TableChange { // Change in files at the specified instant InternalFilesDiff filesDiff; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java new file mode 100644 index 000000000..f9480366c --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +import java.util.Iterator; +import java.util.function.Supplier; + +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import lombok.experimental.Accessors; +import lombok.experimental.FieldDefaults; +import lombok.experimental.SuperBuilder; + +@Accessors(fluent = true) +@SuperBuilder(toBuilder = true) +@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE) +@Getter +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public class InternalDeletionVector extends InternalFile { + // path (absolute with scheme) of data file to which this deletion vector belongs + @NonNull String dataFilePath; + + // super.getFileSizeBytes() is the size of the deletion vector file + // super.getPhysicalPath() is the absolute path (with scheme) of the deletion vector file + // super.getRecordCount() is the count of records in the deletion vector file + + // offset of deletion vector start in a deletion vector file + int offset; + + /** + * binary representation of the deletion vector. The consumer can use the {@link + * #ordinalsIterator()} to extract the ordinals represented in the binary format. + */ + byte[] binaryRepresentation; + + /** + * Supplier for an iterator that returns the ordinals of records deleted by this deletion vector + * in the linked data file, identified by {@link #dataFilePath}. + * + *

The {@link InternalDeletionVector} instance does not guarantee that a new or distinct result + * will be returned each time the supplier is invoked. However, the supplier is expected to return + * a new iterator for each call. + */ + @Getter(AccessLevel.NONE) + Supplier> ordinalsSupplier; + + /** + * @return An iterator that returns the ordinals of records deleted by this deletion vector in the + * linked data file. There is no guarantee that a new or distinct iterator will be returned + * each time the iterator is invoked. + */ + public Iterator ordinalsIterator() { + return ordinalsSupplier.get(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java index 16a320f12..9e3b33cfd 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java @@ -18,18 +18,26 @@ package org.apache.xtable.delta; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.AddFile; import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import org.apache.spark.sql.delta.actions.RemoveFile; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray; +import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore; +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore; + +import com.google.common.annotations.VisibleForTesting; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; @@ -38,6 +46,7 @@ import org.apache.xtable.model.stat.FileStats; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaActionsConverter { @@ -113,16 +122,66 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) { * * @param snapshot the commit snapshot * @param addFile the add file action - * @return the deletion vector representation (path of data file), or null if no deletion vector - * is present + * @return the deletion vector representation, or null if no deletion vector is present */ - public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) { + public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) { DeletionVectorDescriptor deletionVector = addFile.deletionVector(); if (deletionVector == null) { return null; } String dataFilePath = addFile.path(); - return getFullPathToFile(snapshot, dataFilePath); + dataFilePath = getFullPathToFile(snapshot, dataFilePath); + + InternalDeletionVector.InternalDeletionVectorBuilder deleteVectorBuilder = + InternalDeletionVector.builder() + .recordCount(deletionVector.cardinality()) + .fileSizeBytes(deletionVector.sizeInBytes()) + .dataFilePath(dataFilePath); + + if (deletionVector.isInline()) { + deleteVectorBuilder + .binaryRepresentation(deletionVector.inlineData()) + .physicalPath("") + .ordinalsSupplier(() -> ordinalsIterator(deletionVector.inlineData())); + } else { + Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath()); + deleteVectorBuilder + .offset(getOffset(deletionVector)) + .physicalPath(deletionVectorFilePath.toString()) + .ordinalsSupplier(() -> ordinalsIterator(snapshot, deletionVector)); + } + + return deleteVectorBuilder.build(); + } + + private Iterator ordinalsIterator(byte[] bytes) { + RoaringBitmapArray rbm = RoaringBitmapArray.readFrom(bytes); + long[] ordinals = rbm.values(); + return Arrays.stream(ordinals).iterator(); + } + + private Iterator ordinalsIterator( + Snapshot snapshot, DeletionVectorDescriptor deleteVector) { + Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath()); + int offset = getOffset(deleteVector); + long[] ordinals = + parseOrdinalFile( + snapshot.deltaLog().newDeltaHadoopConf(), + deletionVectorFilePath, + deleteVector.sizeInBytes(), + offset); + return Arrays.stream(ordinals).iterator(); + } + + private static int getOffset(DeletionVectorDescriptor deleteVector) { + return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1; + } + + @VisibleForTesting + long[] parseOrdinalFile(Configuration conf, Path filePath, int size, int offset) { + DeletionVectorStore dvStore = new HadoopFileSystemDVStore(conf); + RoaringBitmapArray rbm = dvStore.read(filePath, offset, size); + return rbm.values(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 97804d5f2..3524fb8fd 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -22,11 +22,11 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.Builder; import lombok.extern.log4j.Log4j2; @@ -53,6 +53,8 @@ import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalDeletionVector; +import org.apache.xtable.model.storage.InternalFile; import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.spi.extractor.ConversionSource; @@ -113,8 +115,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // All 3 of the following data structures use data file's absolute path as the key Map addedFiles = new HashMap<>(); Map removedFiles = new HashMap<>(); - // Set of data file paths for which deletion vectors exists. - Set deletionVectors = new HashSet<>(); + // Map of data file paths for which deletion vectors exists. + Map deletionVectors = new HashMap<>(); for (Action action : actionsForVersion) { if (action instanceof AddFile) { @@ -129,10 +131,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) { DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); - String deleteVectorPath = - actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action); - if (deleteVectorPath != null) { - deletionVectors.add(deleteVectorPath); + InternalDeletionVector deletionVector = + actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action); + if (deletionVector != null) { + deletionVectors.put(deletionVector.dataFilePath(), deletionVector); } } else if (action instanceof RemoveFile) { InternalDataFile dataFile = @@ -151,7 +153,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { // entry which is replaced by a new entry, AddFile with delete vector information. Since the // same data file is removed and added, we need to remove it from the added and removed file // maps which are used to track actual added and removed data files. - for (String deletionVector : deletionVectors) { + for (String deletionVector : deletionVectors.keySet()) { // validate that a Remove action is also added for the data file if (removedFiles.containsKey(deletionVector)) { addedFiles.remove(deletionVector); @@ -163,11 +165,15 @@ public TableChange getTableChangeForCommit(Long versionNumber) { } } + List allAddedFiles = + Stream.concat(addedFiles.values().stream(), deletionVectors.values().stream()) + .collect(Collectors.toList()); InternalFilesDiff internalFilesDiff = InternalFilesDiff.builder() - .filesAdded(addedFiles.values()) + .filesAdded(allAddedFiles) .filesRemoved(removedFiles.values()) .build(); + return TableChange.builder() .tableAsOfChange(tableAtVersion) .filesDiff(internalFilesDiff) diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index ee5b1ccdd..909b1b790 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.functions; import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.actions.AddFile; import com.google.common.base.Preconditions; @@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException { } public List getAllActiveFiles() { - return deltaLog.snapshot().allFiles().collectAsList().stream() + return getAllActiveFilesInfo().stream() .map(addFile -> addSlashToBasePath(basePath) + addFile.path()) .collect(Collectors.toList()); } + public List getAllActiveFilesInfo() { + return deltaLog.snapshot().allFiles().collectAsList(); + } + private String addSlashToBasePath(String basePath) { if (basePath.endsWith("/")) { return basePath; diff --git a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java index 9e95f2795..2c330edcb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java @@ -30,6 +30,7 @@ import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; public class ValidationTestHelper { @@ -96,7 +97,7 @@ public static List getAllFilePaths(InternalSnapshot internalSnapshot) { } private static Set extractPathsFromDataFile(Set dataFiles) { - return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet()); + return dataFiles.stream().map(InternalFile::getPhysicalPath).collect(Collectors.toSet()); } private static void replaceFileScheme(List filePaths) { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java index ed02893e3..dcfb5f80d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java @@ -19,11 +19,16 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -38,6 +43,7 @@ import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.actions.AddFile; +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; import scala.Option; @@ -49,6 +55,7 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.InternalDeletionVector; import org.apache.xtable.model.storage.TableFormat; public class ITDeltaDeleteVectorConvert { @@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert { private static SparkSession sparkSession; private DeltaConversionSourceProvider conversionSourceProvider; + private TestSparkDeltaTable testSparkDeltaTable; @BeforeAll public static void setupOnce() { @@ -91,11 +99,24 @@ void setUp() { conversionSourceProvider.init(hadoopConf); } + private static class TableState { + Map activeFiles; + List rowsToDelete; + + TableState(Map activeFiles) { + this(activeFiles, Collections.emptyList()); + } + + TableState(Map activeFiles, List rowsToDelete) { + this.activeFiles = activeFiles; + this.rowsToDelete = rowsToDelete; + } + } + @Test public void testInsertsUpsertsAndDeletes() { String tableName = GenericTable.getTableName(); - TestSparkDeltaTable testSparkDeltaTable = - new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); + testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false); // enable deletion vectors for the test table testSparkDeltaTable @@ -105,25 +126,30 @@ public void testInsertsUpsertsAndDeletes() { + tableName + " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); - List> allActiveFiles = new ArrayList<>(); + List testTableStates = new ArrayList<>(); List allTableChanges = new ArrayList<>(); List rows = testSparkDeltaTable.insertRows(50); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + Map tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, Collections.emptyList())); List rows1 = testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); // upsert does not create delete vectors testSparkDeltaTable.upsertRows(rows.subList(0, 20)); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(100L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0); assertEquals(150L, testSparkDeltaTable.getNumRows()); // delete a few rows with gaps in ids @@ -133,12 +159,15 @@ public void testInsertsUpsertsAndDeletes() { .collect(Collectors.toList()); rowsToDelete.addAll(rows.subList(35, 45)); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(135L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15); assertEquals(185L, testSparkDeltaTable.getNumRows()); // delete a few rows from a file which already has a deletion vector, this should generate a @@ -146,18 +175,22 @@ public void testInsertsUpsertsAndDeletes() { // This deletion step intentionally deletes the same rows again to test the merge. rowsToDelete = rows1.subList(5, 15); testSparkDeltaTable.deleteRows(rowsToDelete); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles, rowsToDelete)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(178L, testSparkDeltaTable.getNumRows()); - validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22); testSparkDeltaTable.insertRows(50); - allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); + tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable); + testTableStates.add(new TableState(tableFiles)); + validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22); assertEquals(228L, testSparkDeltaTable.getNumRows()); + String tableBasePath = testSparkDeltaTable.getBasePath(); SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .basePath(testSparkDeltaTable.getBasePath()) + .basePath(tableBasePath) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = @@ -165,8 +198,9 @@ public void testInsertsUpsertsAndDeletes() { InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); // validateDeltaPartitioning(internalSnapshot); - ValidationTestHelper.validateSnapshot( - internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1)); + List activeDataFilePaths = + new ArrayList<>(testTableStates.get(testTableStates.size() - 1).activeFiles.keySet()); + ValidationTestHelper.validateSnapshot(internalSnapshot, activeDataFilePaths); // Get changes in incremental format. InstantsForIncrementalSync instantsForIncrementalSync = @@ -179,13 +213,126 @@ public void testInsertsUpsertsAndDeletes() { TableChange tableChange = conversionSource.getTableChangeForCommit(version); allTableChanges.add(tableChange); } - ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges); + + List> allActiveDataFilePaths = + testTableStates.stream() + .map(s -> s.activeFiles) + .map(Map::keySet) + .map(ArrayList::new) + .collect(Collectors.toList()); + ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, allTableChanges); + + validateDeletionInfo(testTableStates, allTableChanges); + } + + // collects active files in the current snapshot as a map and adds it to the list + private Map collectActiveFilesAfterCommit( + TestSparkDeltaTable testSparkDeltaTable) { + Map allFiles = + testSparkDeltaTable.getAllActiveFilesInfo().stream() + .collect( + Collectors.toMap( + file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()), + file -> file)); + return allFiles; + } + + private void validateDeletionInfo( + List testTableStates, List allTableChanges) { + if (allTableChanges.isEmpty() && testTableStates.size() <= 1) { + return; + } + + assertEquals( + allTableChanges.size(), + testTableStates.size() - 1, + "Number of table changes should be equal to number of commits - 1"); + + for (int i = 0; i < allTableChanges.size() - 1; i++) { + Map activeFileAfterCommit = testTableStates.get(i + 1).activeFiles; + Map activeFileBeforeCommit = testTableStates.get(i).activeFiles; + + Map activeFilesWithUpdatedDeleteInfo = + activeFileAfterCommit.entrySet().stream() + .filter(e -> e.getValue().deletionVector() != null) + .filter( + entry -> { + if (activeFileBeforeCommit.get(entry.getKey()) == null) { + return true; + } + if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) { + return true; + } + DeletionVectorDescriptor deletionVectorDescriptor = + activeFileBeforeCommit.get(entry.getKey()).deletionVector(); + return !deletionVectorDescriptor.equals(entry.getValue().deletionVector()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (activeFilesWithUpdatedDeleteInfo.isEmpty()) { + continue; + } + + // validate all new delete vectors are correctly detected + validateDeletionInfoForCommit( + testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i)); + } + } + + private void validateDeletionInfoForCommit( + TableState tableState, + Map activeFilesAfterCommit, + TableChange changeDetectedForCommit) { + Map detectedDeleteInfos = + changeDetectedForCommit.getFilesDiff().getFilesAdded().stream() + .filter(file -> file instanceof InternalDeletionVector) + .map(file -> (InternalDeletionVector) file) + .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file)); + + Map filesWithDeleteVectors = + activeFilesAfterCommit.entrySet().stream() + .filter(file -> file.getValue().deletionVector() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size()); + + for (Map.Entry fileWithDeleteVector : filesWithDeleteVectors.entrySet()) { + InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey()); + assertNotNull(deleteInfo); + DeletionVectorDescriptor deletionVectorDescriptor = + fileWithDeleteVector.getValue().deletionVector(); + assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.getRecordCount()); + assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.getFileSizeBytes()); + assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset()); + + String deletionFilePath = + deletionVectorDescriptor + .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath())) + .toString(); + assertEquals(deletionFilePath, deleteInfo.getPhysicalPath()); + + Iterator iterator = deleteInfo.ordinalsIterator(); + List deletes = new ArrayList<>(); + iterator.forEachRemaining(deletes::add); + assertEquals(deletes.size(), deleteInfo.getRecordCount()); + } + } + + private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) { + String filePath = file.path(); + if (filePath.startsWith(tableBasePath)) { + return filePath; + } + return Paths.get(tableBasePath, file.path()).toString(); } private void validateDeletedRecordCount( - DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) { + DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) { List allFiles = - deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList(); + deltaLog + .getSnapshotAt(deltaLog.snapshot().version(), Option.empty()) + .allFiles() + .collectAsList(); List filesWithDeletionVectors = allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index e62e93414..117b3fc79 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -18,10 +18,20 @@ package org.apache.xtable.delta; -import java.net.URISyntaxException; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Iterator; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -29,39 +39,155 @@ import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.actions.AddFile; import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray; +import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArrayFormat; import scala.Option; +import org.apache.xtable.model.storage.InternalDeletionVector; + class TestDeltaActionsConverter { + private final String basePath = "https://container.blob.core.windows.net/tablepath/"; + private final int size = 372; + private final long time = 376; + private final boolean dataChange = true; + private final String stats = ""; + private final int cardinality = 42; + private final int offset = 634; + @Test - void extractDeletionVector() throws URISyntaxException { + void extractMissingDeletionVector() { DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance(); - int size = 123; - long time = 234L; - boolean dataChange = true; - String stats = ""; - String filePath = "https://container.blob.core.windows.net/tablepath/file_path"; + String filePath = basePath + "file_path"; Snapshot snapshot = Mockito.mock(Snapshot.class); - DeltaLog deltaLog = Mockito.mock(DeltaLog.class); DeletionVectorDescriptor deletionVector = null; AddFile addFileAction = new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); - Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNull(internaldeletionVector); + } - deletionVector = + @Test + void extractDeletionVectorInFileAbsolutePath() { + DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance()); + + String dataFilePath = "data_file"; + String deleteFilePath = "https://container.blob.core.windows.net/tablepath/delete_file"; + Snapshot snapshot = Mockito.mock(Snapshot.class); + + DeletionVectorDescriptor deletionVector = DeletionVectorDescriptor.onDiskWithAbsolutePath( - filePath, size, 42, Option.empty(), Option.empty()); + deleteFilePath, size, cardinality, Option.apply(offset), Option.empty()); - addFileAction = - new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); + AddFile addFileAction = + new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector); + + Configuration conf = new Configuration(); + DeltaLog deltaLog = Mockito.mock(DeltaLog.class); + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()).thenReturn(new Path(basePath)); + when(deltaLog.newDeltaHadoopConf()).thenReturn(conf); + + long[] ordinals = {45, 78, 98}; + Mockito.doReturn(ordinals) + .when(actionsConverter) + .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset); + + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath()); + assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath()); + assertEquals(offset, internaldeletionVector.offset()); + assertEquals(cardinality, internaldeletionVector.getRecordCount()); + assertEquals(size, internaldeletionVector.getFileSizeBytes()); + assertNull(internaldeletionVector.binaryRepresentation()); + + Iterator iterator = internaldeletionVector.ordinalsIterator(); + Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next())); + assertFalse(iterator.hasNext()); + } + + @Test + void extractDeletionVectorInFileRelativePath() { + DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance()); + + String dataFilePath = "data_file"; + UUID deleteFileId = UUID.randomUUID(); + String deleteFilePath = basePath + "deletion_vector_" + deleteFileId + ".bin"; + Snapshot snapshot = Mockito.mock(Snapshot.class); + + DeletionVectorDescriptor deletionVector = + DeletionVectorDescriptor.onDiskWithRelativePath( + deleteFileId, "", size, cardinality, Option.apply(offset), Option.empty()); + + AddFile addFileAction = + new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector); + + Configuration conf = new Configuration(); + DeltaLog deltaLog = Mockito.mock(DeltaLog.class); + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()).thenReturn(new Path(basePath)); + when(deltaLog.newDeltaHadoopConf()).thenReturn(conf); + + long[] ordinals = {45, 78, 98}; + Mockito.doReturn(ordinals) + .when(actionsConverter) + .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset); + + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath()); + assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath()); + assertEquals(offset, internaldeletionVector.offset()); + assertEquals(cardinality, internaldeletionVector.getRecordCount()); + assertEquals(size, internaldeletionVector.getFileSizeBytes()); + assertNull(internaldeletionVector.binaryRepresentation()); + + Iterator iterator = internaldeletionVector.ordinalsIterator(); + Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next())); + assertFalse(iterator.hasNext()); + } + + @Test + void extractInlineDeletionVector() { + DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance()); + + String dataFilePath = "data_file"; + Snapshot snapshot = Mockito.mock(Snapshot.class); + + long[] ordinals = {45, 78, 98}; + RoaringBitmapArray rbm = new RoaringBitmapArray(); + Arrays.stream(ordinals).forEach(rbm::add); + byte[] bytes = rbm.serializeAsByteArray(RoaringBitmapArrayFormat.Portable()); + + DeletionVectorDescriptor deletionVector = + DeletionVectorDescriptor.inlineInLog(bytes, cardinality); + + AddFile addFileAction = + new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector); + + DeltaLog deltaLog = Mockito.mock(DeltaLog.class); + when(snapshot.deltaLog()).thenReturn(deltaLog); + when(deltaLog.dataPath()).thenReturn(new Path(basePath)); + + InternalDeletionVector internaldeletionVector = + actionsConverter.extractDeletionVector(snapshot, addFileAction); + assertNotNull(internaldeletionVector); + assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath()); + assertArrayEquals(bytes, internaldeletionVector.binaryRepresentation()); + assertEquals(cardinality, internaldeletionVector.getRecordCount()); + assertEquals(bytes.length, internaldeletionVector.getFileSizeBytes()); + assertEquals("", internaldeletionVector.getPhysicalPath()); + assertEquals(0, internaldeletionVector.offset()); - Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); - Mockito.when(deltaLog.dataPath()) - .thenReturn(new Path("https://container.blob.core.windows.net/tablepath")); - Assertions.assertEquals( - filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); + Iterator iterator = internaldeletionVector.ordinalsIterator(); + Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next())); + assertFalse(iterator.hasNext()); } }