Skip to content

Commit dafe24a

Browse files
Add support for bucket partition transform, ignore void partition in iceberg source
1 parent 23d870e commit dafe24a

12 files changed

+271
-54
lines changed

xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Map;
2324

2425
import lombok.Builder;
2526
import lombok.Value;
@@ -32,6 +33,7 @@
3233
@Value
3334
@Builder
3435
public class InternalPartitionField {
36+
public static final String NUM_BUCKETS = "NUM_BUCKETS";
3537
// Source field the partition is based on
3638
InternalField sourceField;
3739
/*
@@ -47,4 +49,6 @@ public class InternalPartitionField {
4749
@Builder.Default List<String> partitionFieldNames = Collections.emptyList();
4850
// An enum describing how the source data was transformed into the partition value
4951
PartitionTransformType transformType;
52+
// Transform options such as number of buckets in the BUCKET transform type
53+
@Builder.Default Map<String, Object> transformOptions = Collections.emptyMap();
5054
}

xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public enum PartitionTransformType {
3030
MONTH,
3131
DAY,
3232
HOUR,
33-
VALUE;
33+
VALUE,
34+
BUCKET;
3435

3536
public boolean isTimeBased() {
3637
return this == YEAR || this == MONTH || this == DAY || this == HOUR;

xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class DeltaPartitionExtractor {
7575
private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
7676
private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
7777
private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
78+
private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
7879
// For timestamp partition fields, actual partition column names in delta format will be of type
7980
// generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`.
8081
private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s";
@@ -242,7 +243,7 @@ public Map<String, StructField> convertToDeltaPartitionFormat(
242243
currPartitionColumnName = internalPartitionField.getSourceField().getName();
243244
field = null;
244245
} else {
245-
// Since partition field of timestamp type, create new field in schema.
246+
// Since partition field of timestamp or bucket type, create new field in schema.
246247
field = getGeneratedField(internalPartitionField);
247248
currPartitionColumnName = field.name();
248249
}
@@ -270,6 +271,10 @@ public Map<String, String> partitionValueSerialization(InternalDataFile internal
270271
"");
271272
partitionValuesSerialized.put(
272273
partitionField.getSourceField().getName(), partitionValueSerialized);
274+
} else if (transformType == PartitionTransformType.BUCKET) {
275+
partitionValueSerialized = partitionValue.getRange().getMaxValue().toString();
276+
partitionValuesSerialized.put(
277+
getGeneratedColumnName(partitionField), partitionValueSerialized);
273278
} else {
274279
// use appropriate date formatter for value serialization.
275280
partitionValueSerialized =
@@ -352,7 +357,6 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi
352357
String generatedExpression;
353358
DataType dataType;
354359
String currPartitionColumnName = getGeneratedColumnName(internalPartitionField);
355-
Map<String, String> generatedExpressionMetadata = new HashMap<>();
356360
switch (internalPartitionField.getTransformType()) {
357361
case YEAR:
358362
generatedExpression =
@@ -373,10 +377,23 @@ private StructField getGeneratedField(InternalPartitionField internalPartitionFi
373377
String.format(CAST_FUNCTION, internalPartitionField.getSourceField().getPath());
374378
dataType = DataTypes.DateType;
375379
break;
380+
case BUCKET:
381+
generatedExpression =
382+
String.format(
383+
BUCKET_FUNCTION,
384+
internalPartitionField.getSourceField().getPath(),
385+
Integer.MAX_VALUE,
386+
(int)
387+
internalPartitionField
388+
.getTransformOptions()
389+
.get(InternalPartitionField.NUM_BUCKETS));
390+
dataType = DataTypes.IntegerType;
391+
break;
376392
default:
377393
throw new PartitionSpecException("Invalid transform type");
378394
}
379-
generatedExpressionMetadata.put(DELTA_GENERATION_EXPRESSION, generatedExpression);
395+
Map<String, String> generatedExpressionMetadata =
396+
Collections.singletonMap(DELTA_GENERATION_EXPRESSION, generatedExpression);
380397
Metadata partitionFieldMetadata =
381398
new Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata));
382399
return new StructField(currPartitionColumnName, dataType, true, partitionFieldMetadata);

xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java

+8
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
import org.apache.hudi.common.util.VisibleForTesting;
3636
import org.apache.hudi.exception.TableNotFoundException;
3737

38+
import org.apache.xtable.exception.PartitionSpecException;
3839
import org.apache.xtable.exception.UpdateException;
3940
import org.apache.xtable.model.InternalTable;
4041
import org.apache.xtable.model.schema.InternalField;
4142
import org.apache.xtable.model.schema.InternalPartitionField;
43+
import org.apache.xtable.model.schema.PartitionTransformType;
4244
import org.apache.xtable.model.storage.DataLayoutStrategy;
4345

4446
/** A class used to initialize new Hudi tables and load the metadata of existing tables. */
@@ -124,6 +126,12 @@ HoodieTableMetaClient initializeHudiTable(String tableDataPath, InternalTable ta
124126
@VisibleForTesting
125127
static String getKeyGeneratorClass(
126128
List<InternalPartitionField> partitionFields, List<InternalField> recordKeyFields) {
129+
if (partitionFields.stream()
130+
.anyMatch(
131+
internalPartitionField ->
132+
internalPartitionField.getTransformType() == PartitionTransformType.BUCKET)) {
133+
throw new PartitionSpecException("Bucket partition is not yet supported by Hudi targets");
134+
}
127135
boolean multipleRecordKeyFields = recordKeyFields.size() > 1;
128136
boolean multiplePartitionFields = partitionFields.size() > 1;
129137
String keyGeneratorClass;

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java

+34-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818

1919
package org.apache.xtable.iceberg;
2020

21+
import static org.apache.xtable.iceberg.IcebergPartitionValueConverter.BUCKET;
22+
2123
import java.util.ArrayList;
2224
import java.util.Collections;
2325
import java.util.List;
26+
import java.util.Map;
27+
import java.util.regex.Matcher;
28+
import java.util.regex.Pattern;
2429

2530
import lombok.AccessLevel;
2631
import lombok.NoArgsConstructor;
@@ -32,6 +37,7 @@
3237
import org.apache.iceberg.types.Types;
3338

3439
import org.apache.xtable.exception.NotSupportedException;
40+
import org.apache.xtable.exception.PartitionSpecException;
3541
import org.apache.xtable.model.schema.InternalField;
3642
import org.apache.xtable.model.schema.InternalPartitionField;
3743
import org.apache.xtable.model.schema.InternalSchema;
@@ -41,6 +47,7 @@
4147
/** Partition spec builder and extractor for Iceberg. */
4248
@NoArgsConstructor(access = AccessLevel.PRIVATE)
4349
public class IcebergPartitionSpecExtractor {
50+
private static final Pattern NUM_BUCKETS_MATCHER = Pattern.compile("bucket\\[(\\d+)\\]");
4451
private static final IcebergPartitionSpecExtractor INSTANCE = new IcebergPartitionSpecExtractor();
4552

4653
public static IcebergPartitionSpecExtractor getInstance() {
@@ -70,6 +77,12 @@ public PartitionSpec toIceberg(List<InternalPartitionField> partitionFields, Sch
7077
case VALUE:
7178
partitionSpecBuilder.identity(fieldPath);
7279
break;
80+
case BUCKET:
81+
partitionSpecBuilder.bucket(
82+
fieldPath,
83+
(int)
84+
partitioningField.getTransformOptions().get(InternalPartitionField.NUM_BUCKETS));
85+
break;
7386
default:
7487
throw new IllegalArgumentException(
7588
"Unsupported type: " + partitioningField.getTransformType());
@@ -99,13 +112,27 @@ PartitionTransformType fromIcebergTransform(Transform<?, ?> transform) {
99112
throw new NotSupportedException(transformName);
100113
}
101114

102-
if (transformName.startsWith("bucket")) {
103-
throw new NotSupportedException(transformName);
115+
if (transformName.startsWith(BUCKET)) {
116+
return PartitionTransformType.BUCKET;
104117
}
105118

106119
throw new NotSupportedException(transform.toString());
107120
}
108121

122+
private Map<String, Object> getPartitionTransformOptions(Transform<?, ?> transform) {
123+
if (transform.toString().startsWith(BUCKET)) {
124+
Matcher matcher = NUM_BUCKETS_MATCHER.matcher(transform.toString());
125+
if (matcher.matches()) {
126+
return Collections.singletonMap(
127+
InternalPartitionField.NUM_BUCKETS, Integer.parseInt(matcher.group(1)));
128+
} else {
129+
throw new PartitionSpecException(
130+
"Cannot parse number of buckets from partition transform: " + transform);
131+
}
132+
}
133+
return Collections.emptyMap();
134+
}
135+
109136
/**
110137
* Generates internal representation of the Iceberg partition spec.
111138
*
@@ -121,6 +148,10 @@ public List<InternalPartitionField> fromIceberg(
121148

122149
List<InternalPartitionField> irPartitionFields = new ArrayList<>(iceSpec.fields().size());
123150
for (PartitionField iceField : iceSpec.fields()) {
151+
// skip void transform
152+
if (iceField.transform().isVoid()) {
153+
continue;
154+
}
124155
// fetch the ice field from the schema to properly handle hidden partition fields
125156
int sourceColumnId = iceField.sourceId();
126157
Types.NestedField iceSchemaField = iceSchema.findField(sourceColumnId);
@@ -131,6 +162,7 @@ public List<InternalPartitionField> fromIceberg(
131162
InternalPartitionField.builder()
132163
.sourceField(irField)
133164
.transformType(fromIcebergTransform(iceField.transform()))
165+
.transformOptions(getPartitionTransformOptions(iceField.transform()))
134166
.build();
135167
irPartitionFields.add(irPartitionField);
136168
}

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class IcebergPartitionValueConverter {
6666
private static final String DAY = "day";
6767
private static final String HOUR = "hour";
6868
private static final String IDENTITY = "identity";
69+
static final String BUCKET = "bucket";
6970

7071
public static IcebergPartitionValueConverter getInstance() {
7172
return INSTANCE;
@@ -124,8 +125,16 @@ public List<PartitionValue> toXTable(
124125
transformType = PartitionTransformType.VALUE;
125126
break;
126127
default:
127-
throw new NotSupportedException(
128-
"Partition transform not supported: " + partitionField.transform().toString());
128+
if (partitionField.transform().toString().startsWith(BUCKET)) {
129+
value = structLike.get(fieldPosition, Integer.class);
130+
transformType = PartitionTransformType.BUCKET;
131+
} else if (partitionField.transform().isVoid()) {
132+
// skip void type
133+
continue;
134+
} else {
135+
throw new NotSupportedException(
136+
"Partition transform not supported: " + partitionField.transform().toString());
137+
}
129138
}
130139
Types.NestedField partitionSourceField =
131140
partitionSpec.schema().findField(partitionField.sourceId());

xtable-core/src/test/java/org/apache/xtable/ITConversionController.java

+29
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.xtable.hudi.HudiConversionSourceProvider;
101101
import org.apache.xtable.hudi.HudiTestUtil;
102102
import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
103+
import org.apache.xtable.iceberg.TestIcebergDataHelper;
103104
import org.apache.xtable.model.storage.TableFormat;
104105
import org.apache.xtable.model.sync.SyncMode;
105106

@@ -772,6 +773,34 @@ public void testMetadataRetention() throws Exception {
772773
}
773774
}
774775

776+
@Test
777+
void otherIcebergPartitionTypes() {
778+
String tableName = getTableName();
779+
ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration());
780+
List<String> targetTableFormats = Collections.singletonList(DELTA);
781+
782+
ConversionSourceProvider<?> conversionSourceProvider = getConversionSourceProvider(ICEBERG);
783+
try (TestIcebergTable table =
784+
new TestIcebergTable(
785+
tableName,
786+
tempDir,
787+
jsc.hadoopConfiguration(),
788+
"id",
789+
Arrays.asList("level", "string_field"),
790+
TestIcebergDataHelper.SchemaType.COMMON)) {
791+
table.insertRows(100);
792+
793+
ConversionConfig conversionConfig =
794+
getTableSyncConfig(
795+
ICEBERG, SyncMode.FULL, tableName, table, targetTableFormats, null, null);
796+
conversionController.sync(conversionConfig, conversionSourceProvider);
797+
checkDatasetEquivalence(ICEBERG, table, targetTableFormats, 100);
798+
// Query with filter to assert partition does not impact ability to query
799+
checkDatasetEquivalenceWithFilter(
800+
ICEBERG, table, targetTableFormats, "level == 'INFO' AND string_field > 'abc'");
801+
}
802+
}
803+
775804
private Map<String, String> getTimeTravelOption(String tableFormat, Instant time) {
776805
Map<String, String> options = new HashMap<>();
777806
switch (tableFormat) {

xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ private DataFile writeAndGetDataFile(List<Record> records, StructLike partitionK
352352
Path baseDataPath = Paths.get(icebergTable.location(), "data");
353353
String filePath;
354354
if (icebergDataHelper.getPartitionSpec().isPartitioned()) {
355-
String partitionPath = getPartitionPath(partitionKey.get(0, String.class));
355+
String partitionPath = ((PartitionKey) partitionKey).toPath();
356356
filePath =
357357
baseDataPath.resolve(partitionPath).resolve(UUID.randomUUID() + ".parquet").toString();
358358
} else {
@@ -434,14 +434,4 @@ private List<DataFile> writeAllDataFiles(Map<StructLike, List<Record>> recordsBy
434434
.map(entry -> writeAndGetDataFile(entry.getValue(), entry.getKey()))
435435
.collect(Collectors.toList());
436436
}
437-
438-
private String getPartitionPath(Object partitionValue) {
439-
Preconditions.checkArgument(
440-
icebergDataHelper.getPartitionFieldNames().size() == 1,
441-
"Only single partition field is supported for grouping records by partition");
442-
Preconditions.checkArgument(
443-
icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
444-
"Only level partition field is supported for grouping records by partition");
445-
return "level=" + partitionValue;
446-
}
447437
}

xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java

+37
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.xtable.delta;
2020

21+
import static org.apache.xtable.delta.DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION;
2122
import static org.junit.jupiter.api.Assertions.*;
2223

2324
import java.util.Arrays;
@@ -494,6 +495,42 @@ public void testYearMonthDayHourGeneratedPartitionValueExtraction() {
494495
assertEquals(expectedPartitionValues, partitionValues);
495496
}
496497

498+
@Test
499+
void convertBucketPartition() {
500+
InternalPartitionField internalPartitionField =
501+
InternalPartitionField.builder()
502+
.sourceField(
503+
InternalField.builder()
504+
.name("partition_column1")
505+
.schema(
506+
InternalSchema.builder()
507+
.name("string")
508+
.dataType(InternalType.STRING)
509+
.build())
510+
.build())
511+
.transformType(PartitionTransformType.BUCKET)
512+
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 5))
513+
.build();
514+
Map<String, StructField> actual =
515+
deltaPartitionExtractor.convertToDeltaPartitionFormat(
516+
Collections.singletonList(internalPartitionField));
517+
Metadata expectedPartitionFieldMetadata =
518+
new Metadata(
519+
ScalaUtils.convertJavaMapToScala(
520+
Collections.singletonMap(
521+
DELTA_GENERATION_EXPRESSION,
522+
"MOD((HASH(partition_column1) & 2147483647), 5)")));
523+
Map<String, StructField> expected =
524+
Collections.singletonMap(
525+
"xtable_partition_col_BUCKET_partition_column1",
526+
new StructField(
527+
"xtable_partition_col_BUCKET_partition_column1",
528+
DataTypes.IntegerType,
529+
true,
530+
expectedPartitionFieldMetadata));
531+
assertEquals(expected, actual);
532+
}
533+
497534
private scala.collection.mutable.Map<String, String> convertJavaMapToScalaMap(
498535
Map<String, String> javaMap) {
499536
return JavaConverters.mapAsScalaMapConverter(javaMap).asScala();

xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public class TestIcebergDataHelper {
126126
String recordKeyField;
127127
List<String> partitionFieldNames;
128128

129-
public static enum SchemaType {
129+
public enum SchemaType {
130130
BASIC,
131131
COMMON,
132132
COMMON_WITH_ADDITIONAL_COLUMNS,
@@ -202,6 +202,13 @@ public PartitionSpec getPartitionSpec() {
202202
if (partitionFieldNames.isEmpty()) {
203203
return PartitionSpec.unpartitioned();
204204
}
205+
if (partitionFieldNames.equals(Arrays.asList("level", "string_field"))) {
206+
return PartitionSpec.builderFor(tableSchema)
207+
.alwaysNull("bytes_field")
208+
.identity("level")
209+
.bucket("string_field", 10)
210+
.build();
211+
}
205212
if (partitionFieldNames.size() > 1) {
206213
throw new IllegalArgumentException(
207214
"Please modify the code to support multiple partition columns");

0 commit comments

Comments
 (0)