Skip to content

Handle timestamp_ntz in delta and iceberg #647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2025
Merged

Conversation

vinishjail97
Copy link
Contributor

@vinishjail97 vinishjail97 commented Feb 12, 2025

Important Read

  • Please ensure the GitHub issue is mentioned at the beginning of the PR

What is the purpose of the pull request

  1. Handle timestamp_ntz in delta target, ensured backwards compatibility by using the min and max writer version from deltaLog.snapshot()
  2. Handle timestamp_ntz for iceberg target as well.

Brief change log

(for example:)

  • Handle timestamp_ntz in delta target
  • Handle timestamp_ntz for iceberg target as well

Verify this pull request

(Please pick either of the following options)

This change added tests and can be verified as follows:

(example:)

  • testTimestampNtz

@emilie-wang
Copy link

Hi I just came across to this PR, and if you planned to continue this fix, the target sources also need fix and they need to know how to deal with this timestamp_ntz type correctly: for example Iceberg: https://github.com/apache/incubator-xtable/blob/main/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java#L176

@vinishjail97 vinishjail97 marked this pull request as ready for review April 1, 2025 08:21
@vinishjail97
Copy link
Contributor Author

Hi I just came across to this PR, and if you planned to continue this fix, the target sources also need fix and they need to know how to deal with this timestamp_ntz type correctly: for example Iceberg: https://github.com/apache/incubator-xtable/blob/main/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java#L176

I will pick this up in a follow-up PR.


TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
// Delta standalone library can't read versions (3,7) and needs delta kernel dependency.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs delta-kernel upgrade as standalone doesn't support (3,7) versions in delta protocol.
#671

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.delta.standalone.internal.exception.DeltaErrors$InvalidProtocolVersionException: 
Delta protocol version (3,7) is too new for this version of Delta
Standalone Reader/Writer (1,2). Please upgrade to a newer release.


	at io.delta.standalone.internal.DeltaLogImpl.assertProtocolRead(DeltaLogImpl.scala:214)
	at io.delta.standalone.internal.SnapshotImpl.<init>(SnapshotImpl.scala:244)
	at io.delta.standalone.internal.SnapshotManagement.createSnapshot(SnapshotManagement.scala:257)
	at io.delta.standalone.internal.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:224)

This is the exception we hit using latest version of delta-standalone library 3.3.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled the validation using spark for now, upgrading to delta-kernel will be handled separately.

@vinishjail97
Copy link
Contributor Author

The integration tests for source format as HUDI and target format as DELTA are failing, because of the following exception.
Hudi writes parquet files with physicalType as INT64 and logicalType as TimestampType, but when reading the delta table fails because it's expecting a TimestampNTZType type.

Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [timestamp_local_millis_nullable_field], physicalType: INT64, logicalType: timestamp_ntz
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1129)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:191)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:297)
	... 19 more

The test will fail for hudi 0.x and will pass when hudi is upgraded to 1.x.
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L152

@vinishjail97
Copy link
Contributor Author

Hi I just came across to this PR, and if you planned to continue this fix, the target sources also need fix and they need to know how to deal with this timestamp_ntz type correctly: for example Iceberg: https://github.com/apache/incubator-xtable/blob/main/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java#L176

I will pick this up in a follow-up PR.

@emilie-wang I have fixed it for iceberg as well in this PR.

public void testTimestampNtz() {
InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
List<InternalField> fields2 = new ArrayList<>(schema1.getFields());
fields2.add(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested whether the write works if the initial schema contains the timestamp_ntz?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in this test, schema1 contains ntz column and the second commit is adding a nullable float column.

@vinishjail97 vinishjail97 changed the title Handle timestamp_ntz in delta conversion target Handle timestamp_ntz in delta and iceberg Apr 1, 2025
@vinishjail97 vinishjail97 merged commit 680cf9c into main Apr 1, 2025
2 checks passed
@vinishjail97 vinishjail97 deleted the timestamp-ntz branch April 1, 2025 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants