Skip to content

Commit 132d0fe

Browse files
committed
Add verification
1 parent fa40a26 commit 132d0fe

File tree

4 files changed

+124
-21
lines changed

4 files changed

+124
-21
lines changed

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public class TestHoodieFileGroupReaderOnHive extends TestHoodieFileGroupReaderBa
9090

9191
@Override
9292
@Disabled("[HUDI-8072]")
93-
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception {
93+
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode,
94+
String logDataBlockFormat,
95+
Boolean useNestedKey) throws Exception {
9496
}
9597

9698
private static final String PARTITION_COLUMN = "datestr";

hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,55 +115,81 @@ public void validateRecordsInFileGroup(String tablePath,
115115

116116
private static Stream<Arguments> testArguments() {
117117
return Stream.of(
118-
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro"),
119-
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "parquet"),
120-
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro"),
121-
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet"),
122-
arguments(RecordMergeMode.CUSTOM, "avro"),
123-
arguments(RecordMergeMode.CUSTOM, "parquet")
118+
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro", false),
119+
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "parquet", false),
120+
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro", false),
121+
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet", false),
122+
arguments(RecordMergeMode.CUSTOM, "avro", false),
123+
arguments(RecordMergeMode.CUSTOM, "parquet", false),
124+
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro", true),
125+
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "parquet", true),
126+
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro", true),
127+
arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet", true),
128+
arguments(RecordMergeMode.CUSTOM, "avro", true),
129+
arguments(RecordMergeMode.CUSTOM, "parquet", true)
124130
);
125131
}
126132

127133
@ParameterizedTest
128134
@MethodSource("testArguments")
129-
public void testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception {
130-
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(recordMergeMode));
135+
public void testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
136+
String logDataBlockFormat,
137+
Boolean useNestedKey) throws Exception {
138+
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(recordMergeMode, useNestedKey));
131139
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat);
132140

133141
try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) {
142+
List<HoodieRecord> baseRecords = useNestedKey
143+
? dataGen.generateInsertsWithKeyFieldSpecified("001", "fare", 100)
144+
: dataGen.generateInserts("001", 100);
145+
134146
// One commit; reading one file group containing a base file only
135-
commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), writeConfigs);
147+
commitToTable(baseRecords, INSERT.value(), writeConfigs);
136148
validateOutputFromFileGroupReader(
137149
getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 0, recordMergeMode);
138150

139151
// Two commits; reading one file group containing a base file and a log file
140-
commitToTable(dataGen.generateUpdates("002", 100), UPSERT.value(), writeConfigs);
152+
List<HoodieRecord> firstUpdate = useNestedKey
153+
? dataGen.generateUpdates("002", baseRecords, "fare")
154+
: dataGen.generateUpdates("002", 100);
155+
commitToTable(firstUpdate, UPSERT.value(), writeConfigs);
141156
validateOutputFromFileGroupReader(
142157
getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 1, recordMergeMode);
143158

144159
// Three commits; reading one file group containing a base file and two log files
145-
commitToTable(dataGen.generateUpdates("003", 100), UPSERT.value(), writeConfigs);
160+
List<HoodieRecord> secondUpdates = useNestedKey
161+
? dataGen.generateUpdates("003", baseRecords, "fare")
162+
: dataGen.generateUpdates("003", 100);
163+
commitToTable(secondUpdates, UPSERT.value(), writeConfigs);
146164
validateOutputFromFileGroupReader(
147165
getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 2, recordMergeMode);
148166
}
149167
}
150168

151169
@ParameterizedTest
152170
@MethodSource("testArguments")
153-
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode, String logDataBlockFormat) throws Exception {
154-
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(recordMergeMode));
171+
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMode,
172+
String logDataBlockFormat,
173+
Boolean useNestedKey) throws Exception {
174+
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(recordMergeMode, useNestedKey));
155175
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat);
156176
// Use InMemoryIndex to generate log only mor table
157177
writeConfigs.put("hoodie.index.type", "INMEMORY");
158178

159179
try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) {
160-
// One commit; reading one file group containing a base file only
161-
commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), writeConfigs);
180+
// One commit; reading one file group containing one log file
181+
List<HoodieRecord> baseRecords = useNestedKey
182+
? dataGen.generateInsertsWithKeyFieldSpecified("001", "fare", 100)
183+
: dataGen.generateInserts("001", 100);
184+
commitToTable(baseRecords, INSERT.value(), writeConfigs);
162185
validateOutputFromFileGroupReader(
163186
getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 1, recordMergeMode);
164187

165-
// Two commits; reading one file group containing a base file and a log file
166-
commitToTable(dataGen.generateUpdates("002", 100), UPSERT.value(), writeConfigs);
188+
// Two commits; reading two log files
189+
List<HoodieRecord> updateRecords = useNestedKey
190+
? dataGen.generateUpdates("002", baseRecords, "fare")
191+
: dataGen.generateUpdates("002", 100);
192+
commitToTable(updateRecords, UPSERT.value(), writeConfigs);
167193
validateOutputFromFileGroupReader(
168194
getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 2, recordMergeMode);
169195
}
@@ -172,7 +198,7 @@ public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode recordMergeMo
172198
@ParameterizedTest
173199
@EnumSource(value = ExternalSpillableMap.DiskMapType.class)
174200
public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType diskMapType) throws Exception {
175-
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING));
201+
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING, false));
176202
Option<Schema.Type> orderingFieldType = Option.of(Schema.Type.STRING);
177203
try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) {
178204
commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), writeConfigs);
@@ -230,9 +256,11 @@ public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType diskMapType)
230256
}
231257
}
232258

233-
private Map<String, String> getCommonConfigs(RecordMergeMode recordMergeMode) {
259+
private Map<String, String> getCommonConfigs(RecordMergeMode recordMergeMode, Boolean useNestedKey) {
234260
Map<String, String> configMapping = new HashMap<>();
235-
configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
261+
configMapping.put(
262+
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
263+
useNestedKey ? "fare" : "_row_key");
236264
configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
237265
configMapping.put("hoodie.datasource.write.precombine.field", "timestamp");
238266
configMapping.put("hoodie.payload.ordering.field", "timestamp");

hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.storage.StorageConfiguration;
4040
import org.apache.hudi.storage.StoragePath;
4141

42+
import com.fasterxml.jackson.databind.ObjectMapper;
4243
import org.apache.avro.Conversions;
4344
import org.apache.avro.LogicalTypes;
4445
import org.apache.avro.Schema;
@@ -848,6 +849,68 @@ public List<HoodieRecord> generateSameKeyInserts(String instantTime, List<Hoodie
848849
return copy;
849850
}
850851

852+
/**
853+
* Step 1: Generate a list of HoodieRecord.
854+
* Step 2: Based on the keyField, extract the HoodieKey for each record.
855+
* Step 3: Create a list of HoodieRecord combining keys and their payload.
856+
*
857+
* This is different from other methods since their key are not necessarily from one of the columns.
858+
*/
859+
public List<HoodieRecord> generateInsertsWithKeyFieldSpecified(String instantTime,
860+
String keyField,
861+
int recordNum) throws IOException {
862+
List<HoodieRecord> initRecords = generateInserts(instantTime, recordNum);
863+
List<HoodieKey> keys = initRecords.stream().map(r -> {
864+
try {
865+
return new HoodieKey(
866+
((RawTripTestPayload) r.getData()).getJsonDataAsMap().get(keyField).toString(),
867+
r.getPartitionPath()
868+
);
869+
} catch (IOException e) {
870+
throw new RuntimeException(e);
871+
}
872+
}).collect(Collectors.toList());
873+
return IntStream.range(0, keys.size())
874+
.mapToObj(i -> new HoodieAvroRecord(keys.get(i), (RawTripTestPayload) initRecords.get(i).getData()))
875+
.collect(Collectors.toList());
876+
}
877+
878+
/**
879+
* Generate an update based on existing records, and use the given keyField
880+
* column to generate the key.
881+
*
882+
* Step 1: Create a list of HoodieRecord.
883+
* Step 2: For each record, create a new HoodieRecord,
884+
* whose HoodieKey is from the given record,
885+
* and payload is generated by replacing key value in the payload.
886+
*/
887+
public List<HoodieRecord> generateUpdates(String instantTime,
888+
List<HoodieRecord> records,
889+
String keyField) throws IOException {
890+
List<HoodieRecord> initRecords = generateInserts(instantTime, records.size());
891+
ObjectMapper mapper = new ObjectMapper();
892+
893+
return IntStream.range(0, records.size())
894+
.mapToObj(i -> {
895+
RawTripTestPayload payload = (RawTripTestPayload) initRecords.get(i).getData();
896+
try {
897+
Map<String, Object> oldJsonMap =
898+
((RawTripTestPayload) records.get(i).getData()).getJsonDataAsMap();
899+
Map<String, Object> jsonMap = payload.getJsonDataAsMap();
900+
// set keyField and partition_path.
901+
jsonMap.put(keyField, oldJsonMap.get(keyField));
902+
jsonMap.put("partition_path", oldJsonMap.get("partition_path"));
903+
String json = mapper.writeValueAsString(jsonMap);
904+
return new HoodieAvroRecord(
905+
records.get(i).getKey(),
906+
new RawTripTestPayload(json, keyField));
907+
} catch (IOException e) {
908+
throw new RuntimeException(e);
909+
}
910+
})
911+
.collect(Collectors.toList());
912+
}
913+
851914
public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String instantTime, int limit) {
852915
List<HoodieRecord> inserts = new ArrayList<>();
853916
int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);

hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ public RawTripTestPayload(String jsonData) throws IOException {
9999
this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number", 0L).toString());
100100
}
101101

102+
public RawTripTestPayload(String jsonData, String keyField) throws IOException {
103+
this.jsonDataCompressed = compressData(jsonData);
104+
this.dataSize = jsonData.length();
105+
Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
106+
this.rowKey = jsonRecordMap.get(keyField).toString();
107+
this.partitionPath = (String) jsonRecordMap.get("partition_path");
108+
this.isDeleted = false;
109+
this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number", 0L).toString());
110+
}
111+
102112
public RawTripTestPayload(GenericRecord record, Comparable orderingVal) {
103113
this.orderingVal = orderingVal;
104114
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {

0 commit comments

Comments
 (0)