Skip to content

Commit 0b967a1

Browse files
committed
Addressing feedback
1 parent 050e59a commit 0b967a1

File tree

5 files changed

+14
-164
lines changed

5 files changed

+14
-164
lines changed

Diff for: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java

-148
Original file line numberDiff line numberDiff line change
@@ -296,41 +296,6 @@ private static ArrayList<ArrayList<String>> partitionKeysByFileSlices(List<Strin
296296
return partitionedKeys;
297297
}
298298

299-
/*@Override
300-
public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> getAllRecordsByKeys(List<String> keys, String partitionName) {
301-
if (keys.isEmpty()) {
302-
return Collections.emptyMap();
303-
}
304-
305-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result;
306-
// Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys.
307-
List<FileSlice> partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName,
308-
k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, getMetadataFileSystemView(), partitionName));
309-
final int numFileSlices = partitionFileSlices.size();
310-
checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0");
311-
312-
// Lookup keys from each file slice
313-
if (numFileSlices == 1) {
314-
// Optimization for a single slice for smaller metadata table partitions
315-
result = lookupAllKeysFromFileSlice(partitionName, keys, partitionFileSlices.get(0));
316-
} else {
317-
// Parallel lookup for large sized partitions with many file slices
318-
// Partition the keys by the file slice which contains it
319-
ArrayList<ArrayList<String>> partitionedKeys = partitionKeysByFileSlices(keys, numFileSlices);
320-
result = new HashMap<>(keys.size());
321-
getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Reading keys from metadata table partition " + partitionName);
322-
getEngineContext().map(partitionedKeys, keysList -> {
323-
if (keysList.isEmpty()) {
324-
return Collections.<String, HoodieRecord<HoodieMetadataPayload>>emptyMap();
325-
}
326-
int shardIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0), numFileSlices);
327-
return lookupAllKeysFromFileSlice(partitionName, keysList, partitionFileSlices.get(shardIndex));
328-
}, partitionedKeys.size()).forEach(map -> result.putAll((Map<String, List<HoodieRecord<HoodieMetadataPayload>>>) map));
329-
}
330-
331-
return result;
332-
}*/
333-
334299
/**
335300
* Lookup list of keys from a single file slice.
336301
*
@@ -439,119 +404,6 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByK
439404
return result;
440405
}
441406

442-
/*private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> lookupAllKeysFromFileSlice(String partitionName, List<String> keys, FileSlice fileSlice) {
443-
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice);
444-
try {
445-
List<Long> timings = new ArrayList<>();
446-
HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
447-
HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
448-
if (baseFileReader == null && logRecordScanner == null) {
449-
return Collections.emptyMap();
450-
}
451-
452-
// Sort it here once so that we don't need to sort individually for base file and for each individual log files.
453-
List<String> sortedKeys = new ArrayList<>(keys);
454-
Collections.sort(sortedKeys);
455-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> logRecords = readAllLogRecords(logRecordScanner, sortedKeys, timings);
456-
return readFromBaseAndMergeWithAllLogRecords(baseFileReader, sortedKeys, true, logRecords, timings, partitionName);
457-
} catch (IOException ioe) {
458-
throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe);
459-
} finally {
460-
if (!reuse) {
461-
closeReader(readers);
462-
}
463-
}
464-
}*/
465-
466-
/*private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readAllLogRecords(HoodieMetadataLogRecordReader logRecordReader,
467-
List<String> sortedKeys,
468-
List<Long> timings) {
469-
HoodieTimer timer = HoodieTimer.start();
470-
471-
if (logRecordReader == null) {
472-
timings.add(timer.endTimer());
473-
return Collections.emptyMap();
474-
}
475-
476-
try {
477-
return logRecordReader.getAllRecordsByKeys(sortedKeys);
478-
} finally {
479-
timings.add(timer.endTimer());
480-
}
481-
}*/
482-
483-
/*private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readFromBaseAndMergeWithAllLogRecords(HoodieSeekingFileReader<?> reader,
484-
List<String> sortedKeys,
485-
boolean fullKeys,
486-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> logRecords,
487-
List<Long> timings,
488-
String partitionName) throws IOException {
489-
HoodieTimer timer = HoodieTimer.start();
490-
491-
if (reader == null) {
492-
// No base file at all
493-
timings.add(timer.endTimer());
494-
return logRecords;
495-
}
496-
497-
HoodieTimer readTimer = HoodieTimer.start();
498-
499-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> records =
500-
fetchBaseFileAllRecordsByKeys(reader, sortedKeys, fullKeys, partitionName);
501-
502-
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
503-
504-
// Iterate over all provided log-records, merging them into existing records
505-
506-
logRecords.entrySet().forEach(kv -> {
507-
records.merge(
508-
kv.getKey(),
509-
kv.getValue(),
510-
(oldRecordList, newRecordList) -> {
511-
List<HoodieRecord<HoodieMetadataPayload>> mergedRecordList = new ArrayList<>();
512-
HoodieMetadataPayload mergedPayload = null;
513-
HoodieKey key = null;
514-
if (!oldRecordList.isEmpty() && !newRecordList.isEmpty()) {
515-
mergedPayload = newRecordList.get(0).getData().preCombine(oldRecordList.get(0).getData());
516-
key = newRecordList.get(0).getKey();
517-
} else if (!oldRecordList.isEmpty()) {
518-
mergedPayload = oldRecordList.get(0).getData();
519-
key = oldRecordList.get(0).getKey();
520-
} else if (!newRecordList.isEmpty()) {
521-
mergedPayload = newRecordList.get(0).getData();
522-
key = newRecordList.get(0).getKey();
523-
}
524-
525-
if (mergedPayload != null && !mergedPayload.isDeleted()) {
526-
mergedRecordList.add(new HoodieAvroRecord<>(key, mergedPayload));
527-
}
528-
return mergedRecordList;
529-
}
530-
);
531-
});
532-
533-
timings.add(timer.endTimer());
534-
return records;
535-
}*/
536-
537-
/*private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> fetchBaseFileAllRecordsByKeys(HoodieSeekingFileReader reader,
538-
List<String> sortedKeys,
539-
boolean fullKeys,
540-
String partitionName) throws IOException {
541-
ClosableIterator<HoodieRecord<?>> records = fullKeys
542-
? reader.getRecordsByKeysIterator(sortedKeys)
543-
: reader.getRecordsByKeyPrefixIterator(sortedKeys);
544-
545-
return toStream(records)
546-
.map(record -> {
547-
GenericRecord data = (GenericRecord) record.getData();
548-
return Pair.of(
549-
(String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
550-
composeRecord(data, partitionName));
551-
})
552-
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
553-
}*/
554-
555407
private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
556408
if (metadataTableConfig.populateMetaFields()) {
557409
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,

Diff for: hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ public byte[] serializeRecordsToLogBlock(HoodieStorage storage,
200200

201201
final byte[] recordBytes = serializeRecord(record, writerSchema, keyFieldName);
202202
if (sortedRecordsMap.containsKey(recordKey)) {
203-
LOG.error("Found duplicate record with recordKey: " + recordKey);
203+
LOG.error("Found duplicate record with recordKey: {} ", recordKey);
204204
printRecord("Previous record", sortedRecordsMap.get(recordKey), writerSchema);
205205
printRecord("Current record", recordBytes, writerSchema);
206-
throw new HoodieException(String.format("Writing multiple records with same key %s not supported for %s",
207-
recordKey, this.getClass().getName()));
206+
throw new HoodieException(String.format("Writing multiple records with same key %s not supported for Hfile format with Metadata table",
207+
recordKey));
208208
}
209209
sortedRecordsMap.put(recordKey, recordBytes);
210210
}
@@ -233,12 +233,16 @@ public byte[] serializeRecordsToLogBlock(HoodieStorage storage,
233233
}
234234

235235
/**
236-
* Print the record in json format
236+
* Print the meta fields of the record of interest
237237
*/
238238
private void printRecord(String msg, byte[] bs, Schema schema) throws IOException {
239239
GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
240-
byte[] json = HoodieAvroUtils.avroToJson(record, true);
241-
LOG.error(String.format("%s: %s", msg, new String(json)));
240+
if (schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) != null) {
241+
LOG.error(String.format("%s: Hudi meta field values -> Record key: %s, Partition Path: %s, FileName: %s, CommitTime: %s, CommitSeqNo: %s", msg,
242+
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()), record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(),
243+
record.get(HoodieRecord.FILENAME_METADATA_FIELD).toString(), record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
244+
record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).toString());
245+
}
242246
}
243247

244248
@Override

Diff for: hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,14 @@ public class HoodieAvroHFileWriter
7272
private final String instantTime;
7373
private final TaskContextSupplier taskContextSupplier;
7474
private final boolean populateMetaFields;
75-
private final Schema schema;
7675
private final Option<Schema.Field> keyFieldSchema;
7776
private HFile.Writer writer;
7877
private String minRecordKey;
7978
private String maxRecordKey;
8079
private String prevRecordKey;
8180

8281
// This is private in CacheConfig so have been copied here.
83-
private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";
82+
private static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";
8483

8584
public HoodieAvroHFileWriter(String instantTime, StoragePath file, HoodieHFileConfig hfileConfig, Schema schema,
8685
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
@@ -91,7 +90,6 @@ public HoodieAvroHFileWriter(String instantTime, StoragePath file, HoodieHFileCo
9190
this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
9291
this.wrapperFs = this.isWrapperFileSystem ? Option.of((HoodieWrapperFileSystem) fs) : Option.empty();
9392
this.hfileConfig = hfileConfig;
94-
this.schema = schema;
9593
this.keyFieldSchema = Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName()));
9694

9795
// TODO - compute this compression ratio dynamically by looking at the bytes written to the

Diff for: hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ public IndexedRecord next() {
371371
}
372372

373373
private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
374-
KeyValue kv = new KeyValue(getUTF8Bytes(key), null, null, null);
374+
byte[] keyBytes = getUTF8Bytes(key);
375+
KeyValue kv = new KeyValue(keyBytes, null, null, null);
375376
// NOTE: HFile persists both keys/values as bytes, therefore lexicographical sorted is
376377
// essentially employed
377378
//
@@ -394,7 +395,7 @@ private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner scann
394395
// key is found and the cursor is left where the key is found
395396
Cell c = scanner.getCell();
396397
byte[] valueBytes = copyValueFromCell(c);
397-
GenericRecord record = deserialize(getUTF8Bytes(key), valueBytes, writerSchema, readerSchema);
398+
GenericRecord record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
398399

399400
return Option.of(record);
400401
}

Diff for: hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

-5
Original file line numberDiff line numberDiff line change
@@ -1980,9 +1980,6 @@ public void testColStatsPrefixLookup() throws IOException {
19801980

19811981
// there are 3 partitions in total and 2 commits. total entries should be 6.
19821982
assertEquals(result.size(), 6);
1983-
result.forEach(entry -> {
1984-
//LOG.warn("Prefix search entries just for record key col : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
1985-
});
19861983

19871984
// prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
19881985
PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
@@ -1991,7 +1988,6 @@ public void testColStatsPrefixLookup() throws IOException {
19911988
// 1 partition and 2 commits. total entries should be 2.
19921989
assertEquals(result.size(), 2);
19931990
result.forEach(entry -> {
1994-
// LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
19951991
HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
19961992
String fileName = metadataColumnStats.getFileName();
19971993
if (fileName.contains(firstCommit)) {
@@ -2011,7 +2007,6 @@ public void testColStatsPrefixLookup() throws IOException {
20112007
// 1 partition and 2 commits. total entries should be 2.
20122008
assertEquals(result.size(), 2);
20132009
result.forEach(entry -> {
2014-
// LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
20152010
HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
20162011
// for commit time column, min max should be the same since we disable small files, every commit will create a new file
20172012
assertEquals(metadataColumnStats.getMinValue(), metadataColumnStats.getMaxValue());

0 commit comments

Comments
 (0)