Skip to content

Upgrade to spark version 3.5 #671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<lombok.version>1.18.36</lombok.version>
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.1</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<hudi.version>0.15.0</hudi.version>
<aws.version>2.29.40</aws.version>
<hive.version>2.3.9</hive.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
Expand All @@ -79,15 +79,15 @@
<scala13.version>2.13.15</scala13.version>
<scala.version>${scala12.version}</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.2</spark.version>
<spark.version.prefix>3.4</spark.version.prefix>
<spark.version>3.5.2</spark.version>
<spark.version.prefix>3.5</spark.version.prefix>
<iceberg.version>1.4.2</iceberg.version>
<delta.version>2.4.0</delta.version>
<delta.version>3.0.0</delta.version>
<jackson.version>2.18.2</jackson.version>
<spotless.version>2.43.0</spotless.version>
<apache.rat.version>0.16.1</apache.rat.version>
<google.java.format.version>1.8</google.java.format.version>
<delta.standalone.version>0.5.0</delta.standalone.version>
<delta.standalone.version>3.3.0</delta.standalone.version>
<delta.hive.version>3.0.0</delta.hive.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.dir.pattern>**/target/**</target.dir.pattern>
Expand Down Expand Up @@ -290,7 +290,7 @@
<!-- Delta -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
<version>${delta.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<!-- Delta dependencies -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_${scala.binary.version}</artifactId>
<artifactId>delta-spark_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.delta</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.AddFile;

import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

Expand Down Expand Up @@ -122,7 +123,9 @@ private Stream<AddFile> createAddFileAction(
true,
getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()),
null,
null));
null,
Option.empty(),
Option.empty()));
}

private String getColumnStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
// Timestamps in Delta are microsecond precision by default
private static final Map<InternalSchema.MetadataKey, Object>
DEFAULT_TIMESTAMP_PRECISION_METADATA =
Collections.singletonMap(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);

public static DeltaSchemaExtractor getInstance() {
return INSTANCE;
Expand Down Expand Up @@ -110,11 +115,11 @@ private InternalSchema toInternalSchema(
break;
case "timestamp":
type = InternalType.TIMESTAMP;
// Timestamps in Delta are microsecond precision by default
metadata =
Collections.singletonMap(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.MICROS);
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
break;
case "timestamp_ntz":
type = InternalType.TIMESTAMP_NTZ;
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
break;
case "struct":
StructType structType = (StructType) dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ExternalFilePathUtil;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;

import org.apache.xtable.collectors.CustomCollectors;
Expand Down Expand Up @@ -91,7 +91,10 @@ ReplaceMetadata extractSnapshotChanges(
Set<String> partitionPathsToDrop =
new HashSet<>(
FSUtils.getAllPartitionPaths(
engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
engineContext,
metaClient.getStorage(),
metadataConfig,
metaClient.getBasePathV2().toString()));
ReplaceMetadata replaceMetadata =
partitionedDataFiles.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.xtable.hudi;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;

import lombok.extern.log4j.Log4j2;

import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -35,7 +37,7 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider<Hoodi
public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder()
.setConf(hadoopConf)
.setConf(getStorageConf(hadoopConf))
.setBasePath(sourceTable.getBasePath())
.setLoadActiveTimelineOnLoad(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.xtable.hudi;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;

import java.io.IOException;
Expand Down Expand Up @@ -70,7 +71,7 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.action.clean.CleanPlanner;
Expand Down Expand Up @@ -116,7 +117,8 @@ public HudiConversionTarget() {}
(int) targetTable.getMetadataRetention().toHours(),
maxNumDeltaCommitsBeforeCompaction,
BaseFileUpdatesExtractor.of(
new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
new HoodieJavaEngineContext(getStorageConf(configuration)),
new CachingPath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
Expand Down Expand Up @@ -168,7 +170,8 @@ public void init(TargetTable targetTable, Configuration configuration) {
(int) targetTable.getMetadataRetention().toHours(),
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
BaseFileUpdatesExtractor.of(
new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
new HoodieJavaEngineContext(getStorageConf(configuration)),
new CachingPath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
Expand Down Expand Up @@ -393,7 +396,7 @@ public void commit() {
getNumInstantsToRetain(),
maxNumDeltaCommitsBeforeCompaction,
timelineRetentionInHours);
HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getHadoopConf());
HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getStorageConf());
try (HoodieJavaWriteClient<?> writeClient =
new HoodieJavaWriteClient<>(engineContext, writeConfig)) {
writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
Expand Down Expand Up @@ -518,7 +521,8 @@ private void markInstantsAsCleaned(
Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
cleanInfoPerPartition,
Collections.emptyList());
Collections.emptyList(),
Collections.emptyMap());
// create a clean instant and mark it as requested with the clean plan
HoodieInstant requestedCleanInstant =
new HoodieInstant(
Expand Down Expand Up @@ -548,7 +552,8 @@ private void markInstantsAsCleaned(
})
.collect(Collectors.toList());
HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats);
CleanerUtils.convertCleanMetadata(
cleanTime, Option.empty(), cleanStats, Collections.emptyMap());
// update the metadata table with the clean metadata so the files' metadata are marked for
// deletion
hoodieTableMetadataWriter.performTableServices(Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.xtable.collectors.CustomCollectors;
Expand Down Expand Up @@ -85,20 +86,20 @@ public HudiDataFileExtractor(
HoodieTableMetaClient metaClient,
HudiPartitionValuesExtractor hudiPartitionValuesExtractor,
HudiFileStatsExtractor hudiFileStatsExtractor) {
this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
this.engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
metadataConfig =
HoodieMetadataConfig.newBuilder()
.enable(metaClient.getTableConfig().isMetadataTableAvailable())
.build();
this.basePath = metaClient.getBasePathV2();
this.basePath = HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2());
this.tableMetadata =
metadataConfig.enabled()
? HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), true)
metadataConfig.isEnabled()
? HoodieTableMetadata.create(
engineContext, metaClient.getStorage(), metadataConfig, basePath.toString(), true)
: null;
this.fileSystemViewManager =
FileSystemViewManager.createViewManager(
engineContext,
metadataConfig,
FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.build(),
Expand All @@ -114,7 +115,8 @@ public List<PartitionFileGroup> getFilesCurrentState(InternalTable table) {
List<String> allPartitionPaths =
tableMetadata != null
? tableMetadata.getAllPartitionPaths()
: FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath.toString());
: FSUtils.getAllPartitionPaths(
engineContext, metaClient.getStorage(), metadataConfig, basePath.toString());
return getInternalDataFilesForPartitions(allPartitionPaths, table);
} catch (IOException ex) {
throw new ReadException(
Expand Down Expand Up @@ -402,9 +404,9 @@ private InternalDataFile buildFileWithoutStats(
.recordCount(rowCount)
.columnStats(Collections.emptyList())
.lastModified(
hoodieBaseFile.getFileStatus() == null
hoodieBaseFile.getPathInfo() == null
? 0L
: hoodieBaseFile.getFileStatus().getModificationTime())
: hoodieBaseFile.getPathInfo().getModificationTime())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.fs.CachingPath;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;

Expand Down Expand Up @@ -117,7 +118,9 @@ private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(

private Pair<String, String> getPartitionAndFileName(String path) {
Path filePath = new CachingPath(path);
String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath);
String partitionPath =
HudiPathUtils.getPartitionPath(
HadoopFSUtils.convertToHadoopPath(metaClient.getBasePathV2()), filePath);
return Pair.of(partitionPath, filePath.getName());
}

Expand Down Expand Up @@ -178,8 +181,10 @@ private Optional<Long> getMaxFromColumnStats(List<ColumnStat> columnStats) {
private HudiFileStats computeColumnStatsForFile(
Path filePath, Map<String, InternalField> nameFieldMap) {
List<HoodieColumnRangeMetadata<Comparable>> columnRanges =
UTILS.readRangeFromParquetMetadata(
metaClient.getHadoopConf(), filePath, new ArrayList<>(nameFieldMap.keySet()));
UTILS.readColumnStatsFromMetadata(
metaClient.getStorage(),
HadoopFSUtils.convertToStoragePath(filePath),
new ArrayList<>(nameFieldMap.keySet()));
List<ColumnStat> columnStats =
columnRanges.stream()
.map(
Expand All @@ -188,7 +193,8 @@ private HudiFileStats computeColumnStatsForFile(
.collect(CustomCollectors.toList(columnRanges.size()));
Long rowCount = getMaxFromColumnStats(columnStats).orElse(null);
if (rowCount == null) {
rowCount = UTILS.getRowCount(metaClient.getHadoopConf(), filePath);
rowCount =
UTILS.getRowCount(metaClient.getStorage(), HadoopFSUtils.convertToStoragePath(filePath));
}
return new HudiFileStats(columnStats, rowCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.xtable.hudi;

import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +70,7 @@ public Optional<HoodieTableMetaClient> loadTableMetaClientIfExists(String tableD
return Optional.of(
HoodieTableMetaClient.builder()
.setBasePath(tableDataPath)
.setConf(configuration)
.setConf(getStorageConf(configuration))
.setLoadActiveTimelineOnLoad(false)
.build());
} catch (TableNotFoundException ex) {
Expand Down Expand Up @@ -117,7 +119,7 @@ HoodieTableMetaClient initializeHudiTable(String tableDataPath, InternalTable ta
.map(InternalPartitionField::getSourceField)
.map(InternalField::getPath)
.collect(Collectors.joining(",")))
.initTable(configuration, tableDataPath);
.initTable(getStorageConf(configuration), tableDataPath);
} catch (IOException ex) {
throw new UpdateException("Unable to initialize Hudi table", ex);
}
Expand Down
Loading
Loading