Skip to content

Commit 680cf9c

Browse files
committed
Handle timestamp_ntz in delta and iceberg
1 parent dafe24a commit 680cf9c

11 files changed

+144
-23
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
<spotless.version>2.43.0</spotless.version>
8888
<apache.rat.version>0.16.1</apache.rat.version>
8989
<google.java.format.version>1.8</google.java.format.version>
90-
<delta.standalone.version>0.5.0</delta.standalone.version>
90+
<delta.standalone.version>3.3.0</delta.standalone.version>
9191
<delta.hive.version>3.0.0</delta.hive.version>
9292
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
9393
<target.dir.pattern>**/target/**</target.dir.pattern>

xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,15 @@ private InternalSchema toInternalSchema(
160160
metadata.put(
161161
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
162162
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
163-
newDataType = InternalType.TIMESTAMP_NTZ;
163+
// TODO: https://github.com/apache/incubator-xtable/issues/672
164+
// Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
165+
newDataType = InternalType.LONG;
164166
metadata.put(
165167
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
166168
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
167-
newDataType = InternalType.TIMESTAMP_NTZ;
169+
// TODO: https://github.com/apache/incubator-xtable/issues/672
170+
// Hudi 0.x writes INT64 in parquet, TimestampNTZType support added in 1.x
171+
newDataType = InternalType.LONG;
168172
metadata.put(
169173
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
170174
} else {
@@ -350,6 +354,22 @@ private Schema fromInternalSchema(InternalSchema internalSchema, String currentP
350354
case INT:
351355
return finalizeSchema(Schema.create(Schema.Type.INT), internalSchema);
352356
case LONG:
357+
if (internalSchema.getMetadata() != null
358+
&& internalSchema
359+
.getMetadata()
360+
.containsKey(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)) {
361+
if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
362+
== InternalSchema.MetadataValue.MILLIS) {
363+
return finalizeSchema(
364+
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)),
365+
internalSchema);
366+
}
367+
{
368+
return finalizeSchema(
369+
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
370+
internalSchema);
371+
}
372+
}
353373
return finalizeSchema(Schema.create(Schema.Type.LONG), internalSchema);
354374
case STRING:
355375
return finalizeSchema(Schema.create(Schema.Type.STRING), internalSchema);

xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@
7373

7474
@Log4j2
7575
public class DeltaConversionTarget implements ConversionTarget {
76-
private static final String MIN_READER_VERSION = String.valueOf(1);
76+
private static final int MIN_READER_VERSION = 1;
7777
// gets access to generated columns.
78-
private static final String MIN_WRITER_VERSION = String.valueOf(4);
78+
private static final int MIN_WRITER_VERSION = 4;
7979

8080
private DeltaLog deltaLog;
8181
private DeltaSchemaExtractor schemaExtractor;
@@ -329,8 +329,14 @@ private void commitTransaction() {
329329

330330
private Map<String, String> getConfigurationsForDeltaSync() {
331331
Map<String, String> configMap = new HashMap<>();
332-
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION);
333-
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION);
332+
configMap.put(
333+
DeltaConfigs.MIN_READER_VERSION().key(),
334+
String.valueOf(
335+
Math.max(deltaLog.snapshot().protocol().minReaderVersion(), MIN_READER_VERSION)));
336+
configMap.put(
337+
DeltaConfigs.MIN_WRITER_VERSION().key(),
338+
String.valueOf(
339+
Math.max(deltaLog.snapshot().protocol().minWriterVersion(), MIN_WRITER_VERSION)));
334340
configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
335341
// Sets retention for the Delta Log, does not impact underlying files in the table
336342
configMap.put(

xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@
5656
public class DeltaSchemaExtractor {
5757
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
5858
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();
59+
// Timestamps in Delta are microsecond precision by default
60+
private static final Map<InternalSchema.MetadataKey, Object>
61+
DEFAULT_TIMESTAMP_PRECISION_METADATA =
62+
Collections.singletonMap(
63+
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
5964

6065
public static DeltaSchemaExtractor getInstance() {
6166
return INSTANCE;
@@ -110,11 +115,11 @@ private InternalSchema toInternalSchema(
110115
break;
111116
case "timestamp":
112117
type = InternalType.TIMESTAMP;
113-
// Timestamps in Delta are microsecond precision by default
114-
metadata =
115-
Collections.singletonMap(
116-
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
117-
InternalSchema.MetadataValue.MICROS);
118+
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
119+
break;
120+
case "timestamp_ntz":
121+
type = InternalType.TIMESTAMP_NTZ;
122+
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
118123
break;
119124
case "struct":
120125
StructType structType = (StructType) dataType;

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
173173
case INT:
174174
return Types.IntegerType.get();
175175
case LONG:
176-
case TIMESTAMP_NTZ: // TODO - revisit this
177176
return Types.LongType.get();
178177
case BYTES:
179178
return Types.BinaryType.get();
@@ -189,6 +188,8 @@ Type toIcebergType(InternalField field, AtomicInteger fieldIdTracker) {
189188
return Types.DateType.get();
190189
case TIMESTAMP:
191190
return Types.TimestampType.withZone();
191+
case TIMESTAMP_NTZ:
192+
return Types.TimestampType.withoutZone();
192193
case DOUBLE:
193194
return Types.DoubleType.get();
194195
case DECIMAL:

xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ private DataType convertFieldType(InternalField field) {
6262
case INT:
6363
return DataTypes.IntegerType;
6464
case LONG:
65-
case TIMESTAMP_NTZ:
6665
return DataTypes.LongType;
6766
case BYTES:
6867
case FIXED:
@@ -76,6 +75,8 @@ private DataType convertFieldType(InternalField field) {
7675
return DataTypes.DateType;
7776
case TIMESTAMP:
7877
return DataTypes.TimestampType;
78+
case TIMESTAMP_NTZ:
79+
return DataTypes.TimestampNTZType;
7980
case DOUBLE:
8081
return DataTypes.DoubleType;
8182
case DECIMAL:

xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ public void testAvroLogicalTypes() {
576576
.schema(
577577
InternalSchema.builder()
578578
.name("long")
579-
.dataType(InternalType.TIMESTAMP_NTZ)
579+
.dataType(InternalType.LONG)
580580
.isNullable(false)
581581
.metadata(millisMetadata)
582582
.build())
@@ -586,7 +586,7 @@ public void testAvroLogicalTypes() {
586586
.schema(
587587
InternalSchema.builder()
588588
.name("long")
589-
.dataType(InternalType.TIMESTAMP_NTZ)
589+
.dataType(InternalType.LONG)
590590
.isNullable(false)
591591
.metadata(microsMetadata)
592592
.build())

xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,36 @@ public void testTimestamps() {
322322
.metadata(metadata)
323323
.build())
324324
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
325+
.build(),
326+
InternalField.builder()
327+
.name("requiredTimestampNtz")
328+
.schema(
329+
InternalSchema.builder()
330+
.name("timestamp_ntz")
331+
.dataType(InternalType.TIMESTAMP_NTZ)
332+
.isNullable(false)
333+
.metadata(metadata)
334+
.build())
335+
.build(),
336+
InternalField.builder()
337+
.name("optionalTimestampNtz")
338+
.schema(
339+
InternalSchema.builder()
340+
.name("timestamp_ntz")
341+
.dataType(InternalType.TIMESTAMP_NTZ)
342+
.isNullable(true)
343+
.metadata(metadata)
344+
.build())
345+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
325346
.build()))
326347
.build();
327348

328349
StructType structRepresentationTimestamp =
329350
new StructType()
330351
.add("requiredTimestamp", DataTypes.TimestampType, false)
331-
.add("optionalTimestamp", DataTypes.TimestampType, true);
352+
.add("optionalTimestamp", DataTypes.TimestampType, true)
353+
.add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
354+
.add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
332355

333356
Assertions.assertEquals(
334357
internalSchemaTimestamp,

xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java

+61
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.spark.sql.SparkSession;
5454
import org.apache.spark.sql.types.DataTypes;
5555
import org.junit.jupiter.api.AfterAll;
56+
import org.junit.jupiter.api.Assertions;
5657
import org.junit.jupiter.api.BeforeAll;
5758
import org.junit.jupiter.api.BeforeEach;
5859
import org.junit.jupiter.api.Test;
@@ -94,6 +95,7 @@
9495
import org.apache.xtable.model.storage.DataLayoutStrategy;
9596
import org.apache.xtable.model.storage.FileFormat;
9697
import org.apache.xtable.model.storage.InternalDataFile;
98+
import org.apache.xtable.model.storage.InternalFile;
9799
import org.apache.xtable.model.storage.PartitionFileGroup;
98100
import org.apache.xtable.model.storage.TableFormat;
99101
import org.apache.xtable.schema.SchemaFieldFinder;
@@ -431,6 +433,39 @@ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Excep
431433
assertFalse(unmappedTargetId.isPresent());
432434
}
433435

436+
@Test
437+
public void testTimestampNtz() {
438+
InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
439+
List<InternalField> fields2 = new ArrayList<>(schema1.getFields());
440+
fields2.add(
441+
InternalField.builder()
442+
.name("float_field")
443+
.schema(
444+
InternalSchema.builder()
445+
.name("float")
446+
.dataType(InternalType.FLOAT)
447+
.isNullable(true)
448+
.build())
449+
.build());
450+
InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build();
451+
InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME);
452+
InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME);
453+
454+
InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath);
455+
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath);
456+
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath);
457+
458+
InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
459+
InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
460+
461+
TableFormatSync.getInstance()
462+
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
463+
validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)));
464+
TableFormatSync.getInstance()
465+
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
466+
validateDeltaTableUsingSpark(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)));
467+
}
468+
434469
private static Stream<Arguments> timestampPartitionTestingArgs() {
435470
return Stream.of(
436471
Arguments.of(PartitionTransformType.YEAR),
@@ -472,6 +507,13 @@ private void validateDeltaTable(
472507
internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation");
473508
}
474509

510+
private void validateDeltaTableUsingSpark(
511+
Path basePath, Set<InternalDataFile> internalDataFiles) {
512+
Dataset<Row> dataset = sparkSession.read().format("delta").load(basePath.toString());
513+
long countFromFiles = internalDataFiles.stream().mapToLong(InternalFile::getRecordCount).sum();
514+
Assertions.assertEquals(countFromFiles, dataset.count());
515+
}
516+
475517
private InternalSnapshot buildSnapshot(
476518
InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
477519
return InternalSnapshot.builder()
@@ -563,6 +605,25 @@ private InternalSchema getInternalSchema() {
563605
.build();
564606
}
565607

608+
private InternalSchema getInternalSchemaWithTimestampNtz() {
609+
Map<InternalSchema.MetadataKey, Object> timestampMetadata = new HashMap<>();
610+
timestampMetadata.put(
611+
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
612+
List<InternalField> fields = new ArrayList<>(getInternalSchema().getFields());
613+
fields.add(
614+
InternalField.builder()
615+
.name("timestamp_ntz_field")
616+
.schema(
617+
InternalSchema.builder()
618+
.name("time_ntz")
619+
.dataType(InternalType.TIMESTAMP_NTZ)
620+
.isNullable(true)
621+
.metadata(timestampMetadata)
622+
.build())
623+
.build());
624+
return getInternalSchema().toBuilder().fields(fields).build();
625+
}
626+
566627
private static SparkSession buildSparkSession() {
567628
SparkConf sparkConf =
568629
new SparkConf()

xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -501,14 +501,18 @@ public void testTimestamps() {
501501
1, "requiredTimestampMillis", Types.TimestampType.withZone()),
502502
Types.NestedField.optional(
503503
2, "optionalTimestampMillis", Types.TimestampType.withZone()),
504-
Types.NestedField.required(3, "requiredTimestampNtzMillis", Types.LongType.get()),
505-
Types.NestedField.optional(4, "optionalTimestampNtzMillis", Types.LongType.get()),
504+
Types.NestedField.required(
505+
3, "requiredTimestampNtzMillis", Types.TimestampType.withoutZone()),
506+
Types.NestedField.optional(
507+
4, "optionalTimestampNtzMillis", Types.TimestampType.withoutZone()),
506508
Types.NestedField.required(
507509
5, "requiredTimestampMicros", Types.TimestampType.withZone()),
508510
Types.NestedField.optional(
509511
6, "optionalTimestampMicros", Types.TimestampType.withZone()),
510-
Types.NestedField.required(7, "requiredTimestampNtzMicros", Types.LongType.get()),
511-
Types.NestedField.optional(8, "optionalTimestampNtzMicros", Types.LongType.get()));
512+
Types.NestedField.required(
513+
7, "requiredTimestampNtzMicros", Types.TimestampType.withoutZone()),
514+
Types.NestedField.optional(
515+
8, "optionalTimestampNtzMicros", Types.TimestampType.withoutZone()));
512516
assertTrue(expectedTargetSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(irSchema)));
513517

514518
Schema sourceSchema =

xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,8 @@ public void testTimestamps() {
385385

386386
StructType structRepresentationTimestampNtz =
387387
new StructType()
388-
.add("requiredTimestampNtz", DataTypes.LongType, false)
389-
.add("optionalTimestampNtz", DataTypes.LongType, true);
388+
.add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
389+
.add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
390390

391391
Assertions.assertEquals(
392392
structRepresentationTimestamp,

0 commit comments

Comments
 (0)