From 9f1a269b0f06a70350e59406a4eb058dcc09b037 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Mon, 31 Mar 2025 23:39:27 -0700 Subject: [PATCH] Upgrade to spark version 3.5 --- pom.xml | 12 +++--- xtable-core/pom.xml | 2 +- .../delta/DeltaDataFileUpdatesExtractor.java | 5 ++- .../xtable/delta/DeltaSchemaExtractor.java | 15 ++++--- .../xtable/hudi/BaseFileUpdatesExtractor.java | 7 +++- .../hudi/HudiConversionSourceProvider.java | 4 +- .../xtable/hudi/HudiConversionTarget.java | 17 +++++--- .../xtable/hudi/HudiDataFileExtractor.java | 18 ++++---- .../xtable/hudi/HudiFileStatsExtractor.java | 16 +++++--- .../apache/xtable/hudi/HudiTableManager.java | 6 ++- .../catalog/HudiCatalogPartitionSyncTool.java | 38 ++++++++++------- .../xtable/schema/SparkSchemaExtractor.java | 3 +- .../apache/xtable/TestAbstractHudiTable.java | 12 ++++-- .../org/apache/xtable/TestJavaHudiTable.java | 4 +- .../delta/TestDeltaActionsConverter.java | 24 ++++++++++- .../xtable/delta/TestDeltaStatsExtractor.java | 41 +++++++++++++++++-- .../apache/xtable/delta/TestDeltaSync.java | 12 +++++- .../org/apache/xtable/hudi/HudiTestUtil.java | 3 +- .../xtable/hudi/ITHudiConversionSource.java | 3 +- .../xtable/hudi/ITHudiConversionTarget.java | 36 ++++++++++++---- .../hudi/TestBaseFileUpdatesExtractor.java | 7 ++-- .../hudi/TestHudiFileStatsExtractor.java | 14 ++++--- .../xtable/hudi/TestHudiTableManager.java | 3 +- .../TestHudiCatalogPartitionSyncTool.java | 25 +++++++---- .../schema/TestSparkSchemaExtractor.java | 4 +- 25 files changed, 238 insertions(+), 93 deletions(-) diff --git a/pom.xml b/pom.xml index a30a4c985..aaf33a95a 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 1.18.36 1.18.20.0 3.4.1 - 0.14.0 + 0.15.0 2.29.40 2.3.9 3.3.1 @@ -79,15 +79,15 @@ 2.13.15 ${scala12.version} 2.12 - 3.4.2 - 3.4 + 3.5.2 + 3.5 1.4.2 - 2.4.0 + 3.0.0 2.18.2 2.43.0 0.16.1 1.8 - 0.5.0 + 3.3.0 3.0.0 UTF-8 **/target/** @@ -290,7 +290,7 @@ io.delta - delta-core_${scala.binary.version} + delta-spark_${scala.binary.version} ${delta.version} diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 24bc31df5..958c4dee5 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -102,7 +102,7 @@ io.delta - delta-core_${scala.binary.version} + delta-spark_${scala.binary.version} io.delta diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index caee22f6a..192eeda56 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.actions.Action; import org.apache.spark.sql.delta.actions.AddFile; +import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; @@ -122,7 +123,9 @@ private Stream createAddFileAction( true, getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()), null, - null)); + null, + Option.empty(), + Option.empty())); } private String getColumnStats( diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index d1303e842..1376f884e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -56,6 +56,11 @@ public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); + // Timestamps in Delta are microsecond precision by default + private static final Map + DEFAULT_TIMESTAMP_PRECISION_METADATA = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); public static DeltaSchemaExtractor getInstance() { return INSTANCE; @@ -110,11 +115,11 @@ private InternalSchema toInternalSchema( break; case "timestamp": type = InternalType.TIMESTAMP; - // Timestamps in Delta are microsecond precision by default - metadata = - Collections.singletonMap( - InternalSchema.MetadataKey.TIMESTAMP_PRECISION, - InternalSchema.MetadataValue.MICROS); + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; + break; + case "timestamp_ntz": + type = InternalType.TIMESTAMP_NTZ; + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; break; case "struct": StructType structType = (StructType) dataType; diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java index ba1f9f9d2..e430d2fdf 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java @@ -48,7 +48,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.ExternalFilePathUtil; -import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.fs.CachingPath; import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.xtable.collectors.CustomCollectors; @@ -91,7 +91,10 @@ ReplaceMetadata extractSnapshotChanges( Set partitionPathsToDrop = new HashSet<>( FSUtils.getAllPartitionPaths( - engineContext, metadataConfig, metaClient.getBasePathV2().toString())); + engineContext, + metaClient.getStorage(), + metadataConfig, + metaClient.getBasePathV2().toString())); ReplaceMetadata replaceMetadata = partitionedDataFiles.stream() .map( diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index 0ddbbcb76..50632e34a 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -18,6 +18,8 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; + import lombok.extern.log4j.Log4j2; import org.apache.hudi.common.model.HoodieTableType; @@ -35,7 +37,7 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider writeClient = new HoodieJavaWriteClient<>(engineContext, writeConfig)) { writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION); @@ -518,7 +521,8 @@ private void markInstantsAsCleaned( Collections.emptyMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanInfoPerPartition, - Collections.emptyList()); + Collections.emptyList(), + Collections.emptyMap()); // create a clean instant and mark it as requested with the clean plan HoodieInstant requestedCleanInstant = new HoodieInstant( @@ -548,7 +552,8 @@ private void markInstantsAsCleaned( }) .collect(Collectors.toList()); HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats); + CleanerUtils.convertCleanMetadata( + cleanTime, Option.empty(), cleanStats, Collections.emptyMap()); // update the metadata table with the clean metadata so the files' metadata are marked for // deletion hoodieTableMetadataWriter.performTableServices(Option.empty()); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 77c0ca98c..8c9a86ffc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -56,6 +56,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.xtable.collectors.CustomCollectors; @@ -85,20 +86,20 @@ public HudiDataFileExtractor( HoodieTableMetaClient metaClient, HudiPartitionValuesExtractor hudiPartitionValuesExtractor, HudiFileStatsExtractor hudiFileStatsExtractor) { - this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + this.engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); metadataConfig = HoodieMetadataConfig.newBuilder() .enable(metaClient.getTableConfig().isMetadataTableAvailable()) .build(); - this.basePath = metaClient.getBasePathV2(); + this.basePath = HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()); this.tableMetadata = - metadataConfig.enabled() - ? HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), true) + metadataConfig.isEnabled() + ? HoodieTableMetadata.create( + engineContext, metaClient.getStorage(), metadataConfig, basePath.toString(), true) : null; this.fileSystemViewManager = FileSystemViewManager.createViewManager( engineContext, - metadataConfig, FileSystemViewStorageConfig.newBuilder() .withStorageType(FileSystemViewStorageType.MEMORY) .build(), @@ -114,7 +115,8 @@ public List getFilesCurrentState(InternalTable table) { List allPartitionPaths = tableMetadata != null ? tableMetadata.getAllPartitionPaths() - : FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath.toString()); + : FSUtils.getAllPartitionPaths( + engineContext, metaClient.getStorage(), metadataConfig, basePath.toString()); return getInternalDataFilesForPartitions(allPartitionPaths, table); } catch (IOException ex) { throw new ReadException( @@ -402,9 +404,9 @@ private InternalDataFile buildFileWithoutStats( .recordCount(rowCount) .columnStats(Collections.emptyList()) .lastModified( - hoodieBaseFile.getFileStatus() == null + hoodieBaseFile.getPathInfo() == null ? 0L - : hoodieBaseFile.getFileStatus().getModificationTime()) + : hoodieBaseFile.getPathInfo().getModificationTime()) .build(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index 82a094938..0905ab8b2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -44,7 +44,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.fs.CachingPath; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.MetadataPartitionType; @@ -117,7 +118,9 @@ private Stream computeColumnStatsFromParquetFooters( private Pair getPartitionAndFileName(String path) { Path filePath = new CachingPath(path); - String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath); + String partitionPath = + HudiPathUtils.getPartitionPath( + HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()), filePath); return Pair.of(partitionPath, filePath.getName()); } @@ -178,8 +181,10 @@ private Optional getMaxFromColumnStats(List columnStats) { private HudiFileStats computeColumnStatsForFile( Path filePath, Map nameFieldMap) { List> columnRanges = - UTILS.readRangeFromParquetMetadata( - metaClient.getHadoopConf(), filePath, new ArrayList<>(nameFieldMap.keySet())); + UTILS.readColumnStatsFromMetadata( + metaClient.getStorage(), + HadoopFSUtils.convertToStoragePath(filePath), + new ArrayList<>(nameFieldMap.keySet())); List columnStats = columnRanges.stream() .map( @@ -188,7 +193,8 @@ private HudiFileStats computeColumnStatsForFile( .collect(CustomCollectors.toList(columnRanges.size())); Long rowCount = getMaxFromColumnStats(columnStats).orElse(null); if (rowCount == null) { - rowCount = UTILS.getRowCount(metaClient.getHadoopConf(), filePath); + rowCount = + UTILS.getRowCount(metaClient.getStorage(), HadoopFSUtils.convertToStoragePath(filePath)); } return new HudiFileStats(columnStats, rowCount); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java index c6ac35fb3..1b4c445b9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java @@ -18,6 +18,8 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; + import java.io.IOException; import java.util.List; import java.util.Optional; @@ -68,7 +70,7 @@ public Optional loadTableMetaClientIfExists(String tableD return Optional.of( HoodieTableMetaClient.builder() .setBasePath(tableDataPath) - .setConf(configuration) + .setConf(getStorageConf(configuration)) .setLoadActiveTimelineOnLoad(false) .build()); } catch (TableNotFoundException ex) { @@ -117,7 +119,7 @@ HoodieTableMetaClient initializeHudiTable(String tableDataPath, InternalTable ta .map(InternalPartitionField::getSourceField) .map(InternalField::getPath) .collect(Collectors.joining(","))) - .initTable(configuration, tableDataPath); + .initTable(getStorageConf(configuration), tableDataPath); } catch (IOException ex) { throw new UpdateException("Unable to initialize Hudi table", ex); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java index 792c70635..ea7ac1e4d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java @@ -18,6 +18,8 @@ package org.apache.xtable.hudi.catalog; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -41,7 +43,8 @@ import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.model.PartitionValueExtractor; import org.apache.xtable.catalog.CatalogPartition; @@ -211,9 +214,15 @@ private void updateLastCommitTimeSynced( * @return All relative partitions paths. */ public List getAllPartitionPathsOnStorage(String basePath) { - HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(configuration); + HoodieLocalEngineContext engineContext = + new HoodieLocalEngineContext(getStorageConf(configuration)); // ToDo - if we need to config to validate assumeDatePartitioning - return FSUtils.getAllPartitionPaths(engineContext, basePath, true, false); + return FSUtils.getAllPartitionPaths( + engineContext, + hudiTableManager.loadTableMetaClientIfExists(basePath).get().getStorage(), + basePath, + true, + false); } public List getWrittenPartitionsSince( @@ -244,12 +253,9 @@ private Set getDroppedPartitionsSince( HoodieTableMetaClient metaClient, Option lastCommitTimeSynced, Option lastCommitCompletionTimeSynced) { - HoodieTimeline timeline = - lastCommitTimeSynced.isPresent() - ? TimelineUtils.getCommitsTimelineAfter( - metaClient, lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced) - : metaClient.getActiveTimeline(); - return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); + return new HashSet<>( + TimelineUtils.getDroppedPartitions( + metaClient, lastCommitTimeSynced, lastCommitCompletionTimeSynced)); } /** @@ -266,7 +272,7 @@ private boolean syncPartitions( List partitionEventList) { List newPartitions = filterPartitions( - metaClient.getBasePathV2(), + HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()), partitionEventList, CatalogPartitionEvent.PartitionEventType.ADD); if (!newPartitions.isEmpty()) { @@ -276,7 +282,7 @@ private boolean syncPartitions( List updatePartitions = filterPartitions( - metaClient.getBasePathV2(), + HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()), partitionEventList, CatalogPartitionEvent.PartitionEventType.UPDATE); if (!updatePartitions.isEmpty()) { @@ -286,7 +292,7 @@ private boolean syncPartitions( List dropPartitions = filterPartitions( - metaClient.getBasePathV2(), + HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()), partitionEventList, CatalogPartitionEvent.PartitionEventType.DROP); if (!dropPartitions.isEmpty()) { @@ -373,7 +379,8 @@ private List getPartitionEvents( List events = new ArrayList<>(); for (String storagePartition : allPartitionsOnStorage) { Path storagePartitionPath = - FSUtils.getPartitionPath(metaClient.getBasePathV2(), storagePartition); + HadoopFSUtils.convertToHadoopPath( + FSUtils.constructAbsolutePath(metaClient.getBasePath(), storagePartition)); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); List storagePartitionValues = @@ -398,7 +405,7 @@ private List getPartitionEvents( try { String relativePath = FSUtils.getRelativePartitionPath( - metaClient.getBasePathV2(), new CachingPath(storagePath)); + metaClient.getBasePathV2(), new StoragePath(storagePath)); events.add(CatalogPartitionEvent.newPartitionDropEvent(relativePath)); } catch (IllegalArgumentException e) { log.error( @@ -426,7 +433,8 @@ public List getPartitionEvents( List events = new ArrayList<>(); for (String storagePartition : writtenPartitionsOnStorage) { Path storagePartitionPath = - FSUtils.getPartitionPath(metaClient.getBasePathV2(), storagePartition); + HadoopFSUtils.convertToHadoopPath( + FSUtils.constructAbsolutePath(metaClient.getBasePath(), storagePartition)); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); List storagePartitionValues = diff --git a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java index cda649415..8db9f6109 100644 --- a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java @@ -62,7 +62,6 @@ private DataType convertFieldType(InternalField field) { case INT: return DataTypes.IntegerType; case LONG: - case TIMESTAMP_NTZ: return DataTypes.LongType; case BYTES: case FIXED: @@ -76,6 +75,8 @@ private DataType convertFieldType(InternalField field) { return DataTypes.DateType; case TIMESTAMP: return DataTypes.TimestampType; + case TIMESTAMP_NTZ: + return DataTypes.TimestampNTZType; case DOUBLE: return DataTypes.DoubleType; case DECIMAL: diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java index 3e9a133a2..059202e1a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java @@ -18,6 +18,7 @@ package org.apache.xtable; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME; import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.junit.jupiter.api.Assertions.assertAll; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.Getter; import lombok.SneakyThrows; import org.apache.avro.LogicalType; @@ -94,6 +96,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; @@ -131,7 +134,7 @@ public abstract class TestAbstractHudiTable protected String tableName; // Base path for the table protected String basePath; - protected HoodieTableMetaClient metaClient; + @Getter protected HoodieTableMetaClient metaClient; protected TypedProperties typedProperties; protected KeyGenerator keyGenerator; protected Schema schema; @@ -586,14 +589,14 @@ protected static Schema addTopLevelField(Schema schema) { @SneakyThrows protected HoodieTableMetaClient getMetaClient( TypedProperties keyGenProperties, HoodieTableType hoodieTableType, Configuration conf) { - LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf); + LocalFileSystem fs = (LocalFileSystem) HadoopFSUtils.getFs(basePath, conf); // Enforce checksum such that fs.open() is consistent to DFS fs.setVerifyChecksum(true); fs.mkdirs(new org.apache.hadoop.fs.Path(basePath)); if (fs.exists(new org.apache.hadoop.fs.Path(basePath + "/.hoodie"))) { return HoodieTableMetaClient.builder() - .setConf(conf) + .setConf(getStorageConf(conf)) .setBasePath(basePath) .setLoadActiveTimelineOnLoad(true) .build(); @@ -610,7 +613,8 @@ protected HoodieTableMetaClient getMetaClient( .setCommitTimezone(HoodieTimelineTimeZone.UTC) .setBaseFileFormat(HoodieFileFormat.PARQUET.toString()) .build(); - return HoodieTableMetaClient.initTableAndGetMetaClient(conf, this.basePath, properties); + return HoodieTableMetaClient.initTableAndGetMetaClient( + getStorageConf(conf), this.basePath, properties); } private static Schema.Field copyField(Schema.Field input) { diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java index ce3b25bda..e888821bb 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java @@ -18,6 +18,8 @@ package org.apache.xtable; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; + import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; @@ -320,7 +322,7 @@ private HoodieJavaWriteClient initJavaWriteClient( .withArchivalConfig(archivalConfig) .build(); } - HoodieEngineContext context = new HoodieJavaEngineContext(conf); + HoodieEngineContext context = new HoodieJavaEngineContext(getStorageConf(conf)); return new HoodieJavaWriteClient<>(context, writeConfig); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java index e62e93414..b698ac47a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java @@ -48,7 +48,17 @@ void extractDeletionVector() throws URISyntaxException { DeletionVectorDescriptor deletionVector = null; AddFile addFileAction = - new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); + new AddFile( + filePath, + null, + size, + time, + dataChange, + stats, + null, + deletionVector, + Option.empty(), + Option.empty()); Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction)); deletionVector = @@ -56,7 +66,17 @@ void extractDeletionVector() throws URISyntaxException { filePath, size, 42, Option.empty(), Option.empty()); addFileAction = - new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector); + new AddFile( + filePath, + null, + size, + time, + dataChange, + stats, + null, + deletionVector, + Option.empty(), + Option.empty()); Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog); Mockito.when(deltaLog.dataPath()) diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java index d4d35e7ff..06cb474d0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java @@ -35,6 +35,8 @@ import org.apache.spark.sql.delta.actions.AddFile; +import scala.Option; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -125,7 +127,18 @@ void roundTripStatsConversion() throws IOException { String stats = DeltaStatsExtractor.getInstance() .convertStatsToDeltaFormat(schema, numRecords, columnStats); - AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, stats, null, null); + AddFile addFile = + new AddFile( + "file://path/to/file", + null, + 0, + 0, + true, + stats, + null, + null, + Option.empty(), + Option.empty()); DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance(); FileStats actual = extractor.getColumnStatsForFile(addFile, fields); List actualColumStats = actual.getColumnStats(); @@ -161,7 +174,18 @@ void convertStatsToInternalRepresentation() throws IOException { deltaStats.put("tightBounds", Boolean.TRUE); deltaStats.put("nonExisting", minValues); String stats = MAPPER.writeValueAsString(deltaStats); - AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, stats, null, null); + AddFile addFile = + new AddFile( + "file://path/to/file", + null, + 0, + 0, + true, + stats, + null, + null, + Option.empty(), + Option.empty()); DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance(); FileStats actual = extractor.getColumnStatsForFile(addFile, fields); List actualColumStats = actual.getColumnStats(); @@ -204,7 +228,18 @@ void convertStatsToInternalRepresentation() throws IOException { @Test void convertNullStatsToInternalRepresentation() { List fields = getSchemaFields(); - AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true, null, null, null); + AddFile addFile = + new AddFile( + "file://path/to/file", + null, + 0, + 0, + true, + null, + null, + null, + Option.empty(), + Option.empty()); DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance(); FileStats actual = extractor.getColumnStatsForFile(addFile, fields); List actualColumStats = actual.getColumnStats(); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 7e921efe5..99d5800da 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -516,7 +516,7 @@ private InternalDataFile getDataFile( private InternalSchema getInternalSchema() { Map timestampMetadata = new HashMap<>(); timestampMetadata.put( - InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); return InternalSchema.builder() .dataType(InternalType.RECORD) .name("top_level_schema") @@ -558,6 +558,16 @@ private InternalSchema getInternalSchema() { .isNullable(true) .metadata(timestampMetadata) .build()) + .build(), + InternalField.builder() + .name("timestamp_ntz_field") + .schema( + InternalSchema.builder() + .name("time_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(true) + .metadata(timestampMetadata) + .build()) .build())) .isNullable(false) .build(); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java b/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java index c701a1d54..4835d6e1c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java @@ -18,6 +18,7 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY; import java.nio.file.Path; @@ -60,7 +61,7 @@ static HoodieTableMetaClient initTableAndGetMetaClient( .setTableName("test_table") .setPayloadClass(HoodieAvroPayload.class) .setPartitionFields(partitionFields) - .initTable(new Configuration(), tableBasePath); + .initTable(getStorageConf(new Configuration()), tableBasePath); } public static HoodieWriteConfig getHoodieWriteConfig(HoodieTableMetaClient metaClient) { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java index 376ccedae..9f4987fdd 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java @@ -19,6 +19,7 @@ package org.apache.xtable.hudi; import static java.util.stream.Collectors.groupingBy; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.apache.xtable.testutil.ITTestUtils.validateTable; import static org.junit.jupiter.api.Assertions.*; @@ -688,7 +689,7 @@ private HudiConversionSource getHudiSourceClient( Configuration conf, String basePath, String xTablePartitionConfig) { HoodieTableMetaClient hoodieTableMetaClient = HoodieTableMetaClient.builder() - .setConf(conf) + .setConf(getStorageConf(conf)) .setBasePath(basePath) .setLoadActiveTimelineOnLoad(true) .build(); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java index 99965f1fc..f2ebc76d1 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java @@ -18,6 +18,7 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus; import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; @@ -72,6 +73,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieMetadataFileSystemView; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; @@ -98,7 +100,7 @@ */ public class ITHudiConversionTarget { @TempDir public static Path tempDir; - private static final Configuration CONFIGURATION = new Configuration(); + private static final StorageConfiguration CONFIGURATION = getStorageConf(new Configuration()); private static final HoodieEngineContext CONTEXT = new HoodieJavaEngineContext(CONFIGURATION); private static final String TABLE_NAME = "test_table"; @@ -218,7 +220,11 @@ void syncForExistingTable() { metaClient, partitionPath, Collections.singletonList(Pair.of(fileName, filePath))); try (HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata( - CONTEXT, writeConfig.getMetadataConfig(), tableBasePath, true)) { + CONTEXT, + metaClient.getStorage(), + writeConfig.getMetadataConfig(), + tableBasePath, + true)) { assertColStats(hoodieBackedTableMetadata, partitionPath, fileName); } // include meta fields since the table was created with meta fields enabled @@ -259,7 +265,11 @@ void syncForNewTable() { metaClient, partitionPath, Collections.singletonList(Pair.of(fileName, filePath))); try (HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata( - CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) { + CONTEXT, + metaClient.getStorage(), + getHoodieWriteConfig(metaClient).getMetadataConfig(), + tableBasePath, + true)) { assertColStats(hoodieBackedTableMetadata, partitionPath, fileName); } assertSchema(metaClient, false); @@ -306,7 +316,11 @@ void archiveTimelineAndCleanMetadataTableAfterMultipleCommits(String partitionPa metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName1, filePath1))); try (HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata( - CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) { + CONTEXT, + metaClient.getStorage(), + getHoodieWriteConfig(metaClient).getMetadataConfig(), + tableBasePath, + true)) { assertColStats(hoodieBackedTableMetadata, partitionPath, fileName1); } @@ -324,7 +338,11 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName2, filePath2))); try (HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata( - CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) { + CONTEXT, + metaClient.getStorage(), + getHoodieWriteConfig(metaClient).getMetadataConfig(), + tableBasePath, + true)) { // the metadata for fileName1 should still be present until the cleaner kicks in assertColStats(hoodieBackedTableMetadata, partitionPath, fileName1); // new file stats should be present @@ -375,7 +393,11 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr // col stats should be cleaned up for fileName1 but present for fileName2 and fileName3 try (HoodieBackedTableMetadata hoodieBackedTableMetadata = new HoodieBackedTableMetadata( - CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) { + CONTEXT, + metaClient.getStorage(), + getHoodieWriteConfig(metaClient).getMetadataConfig(), + tableBasePath, + true)) { // assertEmptyColStats(hoodieBackedTableMetadata, partitionPath, fileName1); assertColStats(hoodieBackedTableMetadata, partitionPath, fileName3); assertColStats(hoodieBackedTableMetadata, partitionPath, fileName4); @@ -731,7 +753,7 @@ private HudiConversionTarget getTargetClient() { .name("test_table") .metadataRetention(Duration.of(4, ChronoUnit.HOURS)) .build(), - CONFIGURATION, + (Configuration) CONFIGURATION.unwrapCopy(), 3); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java index 5695319aa..bdb62ffca 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java @@ -18,6 +18,7 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus; import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; @@ -50,7 +51,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.fs.CachingPath; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; @@ -73,7 +74,7 @@ public class TestBaseFileUpdatesExtractor { private static final long RECORD_COUNT = 200L; private static final long LAST_MODIFIED = System.currentTimeMillis(); private static final HoodieEngineContext CONTEXT = - new HoodieJavaEngineContext(new Configuration()); + new HoodieJavaEngineContext(getStorageConf(new Configuration())); private static final InternalPartitionField PARTITION_FIELD = InternalPartitionField.builder() .sourceField( @@ -166,7 +167,7 @@ void extractSnapshotChanges_emptyTargetTable() throws IOException { .setTableName("test_table") .setPayloadClass(HoodieAvroPayload.class) .setPartitionFields("partition_field") - .initTable(new Configuration(), tableBasePath); + .initTable(getStorageConf(new Configuration()), tableBasePath); String partitionPath1 = "partition1"; String fileName1 = "file1.parquet"; diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index a18bb743d..d4918c38d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -18,6 +18,7 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -63,6 +64,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.xtable.GenericTable; import org.apache.xtable.TestJavaHudiTable; @@ -81,7 +83,7 @@ public class TestHudiFileStatsExtractor { private static final Schema NESTED_SCHEMA = AVRO_SCHEMA.getField("nested_record").schema().getTypes().get(1); - private final Configuration configuration = new Configuration(); + private final StorageConfiguration configuration = getStorageConf(new Configuration()); private final InternalField nestedIntBase = getNestedIntBase(); private final InternalSchema nestedSchema = getNestedSchema(nestedIntBase, "nested_record"); private final InternalField longField = getLongField(); @@ -124,6 +126,7 @@ public class TestHudiFileStatsExtractor { void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception { String tableName = GenericTable.getTableName(); String basePath; + HoodieTableMetaClient metaClient; try (TestJavaHudiTable table = TestJavaHudiTable.withSchema( tableName, tempDir, "long_field:SIMPLE", HoodieTableType.COPY_ON_WRITE, AVRO_SCHEMA)) { @@ -131,10 +134,12 @@ void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception { getRecords().stream().map(this::buildRecord).collect(Collectors.toList()); table.insertRecords(true, records); basePath = table.getBasePath(); + metaClient = table.getMetaClient(); } HoodieTableMetadata tableMetadata = HoodieTableMetadata.create( new HoodieJavaEngineContext(configuration), + metaClient.getStorage(), HoodieMetadataConfig.newBuilder().enable(true).build(), basePath, true); @@ -152,8 +157,6 @@ void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception { .fileSizeBytes(4321L) .recordCount(0) .build(); - HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setBasePath(basePath).setConf(configuration).build(); HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(metaClient); List output = fileStatsExtractor @@ -170,7 +173,8 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { try (ParquetWriter writer = AvroParquetWriter.builder( HadoopOutputFile.fromPath( - new org.apache.hadoop.fs.Path(file.toUri()), configuration)) + new org.apache.hadoop.fs.Path(file.toUri()), + (Configuration) configuration.unwrapCopy())) .withSchema(AVRO_SCHEMA) .withDataModel(genericData) .build()) { @@ -190,7 +194,7 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { .build(); HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); - when(mockMetaClient.getHadoopConf()).thenReturn(configuration); + when(mockMetaClient.getStorageConf()).thenReturn(configuration); HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient); List output = fileStatsExtractor diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java index c0d5e6d4e..3ee7a168c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java @@ -18,6 +18,7 @@ package org.apache.xtable.hudi; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -98,7 +99,7 @@ void validateTableInitializedCorrectly( HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setBasePath(tableBasePath) - .setConf(CONFIGURATION) + .setConf(getStorageConf(CONFIGURATION)) .setLoadActiveTimelineOnLoad(false) .build(); assertFalse(metaClient.getTableConfig().populateMetaFields()); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java index 0c33013a5..7abd43bdd 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java @@ -60,6 +60,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.model.PartitionValueExtractor; import org.apache.xtable.avro.AvroSchemaConverter; @@ -134,17 +135,20 @@ void testSyncAllPartitions() { mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime); List mockedPartitions = Arrays.asList(partitionKey1, partitionKey2); mockFSUtils - .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH), eq(true), eq(false))) + .when( + () -> + FSUtils.getAllPartitionPaths( + any(), any(), eq(TEST_BASE_PATH), eq(true), eq(false))) .thenReturn(mockedPartitions); mockFSUtils - .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), partitionKey1)) + .when(() -> FSUtils.constructAbsolutePath(new StoragePath(TEST_BASE_PATH), partitionKey1)) .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey1)); mockFSUtils - .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), partitionKey2)) + .when(() -> FSUtils.constructAbsolutePath(new StoragePath(TEST_BASE_PATH), partitionKey2)) .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2)); when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH)) .thenReturn(Optional.of(mockMetaClient)); - when(mockMetaClient.getBasePathV2()).thenReturn(new Path(TEST_BASE_PATH)); + when(mockMetaClient.getBasePathV2()).thenReturn(new StoragePath(TEST_BASE_PATH)); when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey1)) .thenReturn(Collections.singletonList(partitionKey1)); when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2)) @@ -209,17 +213,20 @@ void testSyncPartitionsSinceLastSyncTime() { mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime); List mockedPartitions = Arrays.asList(partitionKey1, partitionKey2); mockFSUtils - .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH), eq(true), eq(false))) + .when( + () -> + FSUtils.getAllPartitionPaths( + any(), any(), eq(TEST_BASE_PATH), eq(true), eq(false))) .thenReturn(mockedPartitions); mockFSUtils - .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), partitionKey2)) + .when(() -> FSUtils.constructAbsolutePath(new StoragePath(TEST_BASE_PATH), partitionKey2)) .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2)); mockFSUtils - .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH), partitionKey3)) + .when(() -> FSUtils.constructAbsolutePath(new StoragePath(TEST_BASE_PATH), partitionKey3)) .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey3)); when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH)) .thenReturn(Optional.of(mockMetaClient)); - when(mockMetaClient.getBasePathV2()).thenReturn(new Path(TEST_BASE_PATH)); + when(mockMetaClient.getBasePathV2()).thenReturn(new StoragePath(TEST_BASE_PATH)); when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2)) .thenReturn(Collections.singletonList(partitionKey2)); when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey3)) @@ -255,7 +262,7 @@ void testSyncPartitionsSinceLastSyncTime() { () -> TimelineUtils.getCommitsTimelineAfter(mockMetaClient, "100", Option.of("1000"))) .thenReturn(mockTimeline); mockedTimelineUtils - .when(() -> TimelineUtils.getDroppedPartitions(mockTimeline)) + .when(() -> TimelineUtils.getDroppedPartitions(eq(mockMetaClient), any(), any())) .thenReturn(Collections.singletonList(partitionKey2)); CatalogPartition p1 = diff --git a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java index 59385f055..1a5d71b15 100644 --- a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java @@ -385,8 +385,8 @@ public void testTimestamps() { StructType structRepresentationTimestampNtz = new StructType() - .add("requiredTimestampNtz", DataTypes.LongType, false) - .add("optionalTimestampNtz", DataTypes.LongType, true); + .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false) + .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true); Assertions.assertEquals( structRepresentationTimestamp,