Skip to content

[HUDI-9311] Revert HUDI-7146 which causes perf overhead for merging MDT log files #13136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(HoodieData<HoodieRecord<R>> r
HoodieWriteConfig otherConfig = HoodieWriteConfig.newBuilder().withProperties(config.getProps())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
HoodieIndex fallbackIndex = SparkHoodieIndexFactory.createIndex(otherConfig);

// Fallback index needs to be a global index like record index
ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index needs to be a global index like record index");

return fallbackIndex.tagLocation(records, context, hoodieTable);
}

Expand Down Expand Up @@ -163,10 +167,9 @@ public Iterator<Tuple2<String, HoodieRecordGlobalLocation>> call(Iterator<String
recordKeyIterator.forEachRemaining(keysToLookup::add);

// recordIndexInfo object only contains records that are present in record_index.
Map<String, List<HoodieRecordGlobalLocation>> recordIndexInfo = hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
Map<String, HoodieRecordGlobalLocation> recordIndexInfo = hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
return recordIndexInfo.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(loc -> new Tuple2<>(e.getKey(), loc)))
.iterator();
.map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,46 +194,28 @@ private static List<HoodieRecord> getInserts() throws IOException {
new RawTripTestPayload(recordStr4).toHoodieRecord());
}

private static List<HoodieRecord> getInsertsWithSameKeyInTwoPartitions() throws IOException {
String recordStr1 = "{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}";
String recordStr2 = "{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":2}";
String recordStr3 = "{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":3}";
String recordStr4 = "{\"_row_key\":\"003\",\"time\":\"2015-01-31T00:00:04.000Z\",\"number\":4}";
private static List<HoodieRecord> getInsertsBatch2() throws IOException {
String recordStr1 = "{\"_row_key\":\"005\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
String recordStr2 = "{\"_row_key\":\"006\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
String recordStr3 = "{\"_row_key\":\"007\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
String recordStr4 = "{\"_row_key\":\"008\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
return Arrays.asList(
new RawTripTestPayload(recordStr1).toHoodieRecord(),
new RawTripTestPayload(recordStr2).toHoodieRecord(),
new RawTripTestPayload(recordStr3).toHoodieRecord(),
new RawTripTestPayload(recordStr4).toHoodieRecord());
}

@Test
public void testRecordIndexForNonGlobalWrites() throws Exception {
setUp(IndexType.RECORD_INDEX, true, true);
final int totalRecords = 4;
List<HoodieRecord> records = getInsertsWithSameKeyInTwoPartitions();
JavaRDD<HoodieRecord> writtenRecords = jsc.parallelize(records, 1);

// Insert totalRecords records
String newCommitTime = writeClient.createNewInstantTime();
writeClient.startCommitWithTime(newCommitTime);
JavaRDD<WriteStatus> writeStatusRdd = writeClient.upsert(writtenRecords, newCommitTime);
List<WriteStatus> writeStatuses = writeStatusRdd.collect();
assertNoWriteErrors(writeStatuses);
String[] fileIdsFromWriteStatuses = writeStatuses.stream().map(WriteStatus::getFileId)
.sorted().toArray(String[]::new);

// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatusRdd);
// Now tagLocation for these records, index should tag them correctly
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writtenRecords, hoodieTable);
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
List<HoodieRecord> hoodieRecords = writtenRecords.collect();
hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
String[] taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect()
.stream().sorted().toArray(String[]::new);
assertArrayEquals(taggedFileIds, fileIdsFromWriteStatuses);
private static List<HoodieRecord> getUpdates() throws IOException {
String recordStr1 = "{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
String recordStr2 = "{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
String recordStr3 = "{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
String recordStr4 = "{\"_row_key\":\"004\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
return new ArrayList<>(Arrays.asList(
new RawTripTestPayload(recordStr1).toHoodieRecord(),
new RawTripTestPayload(recordStr2).toHoodieRecord(),
new RawTripTestPayload(recordStr3).toHoodieRecord(),
new RawTripTestPayload(recordStr4).toHoodieRecord()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.common.table.log.block;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
Expand All @@ -38,7 +37,6 @@
import org.apache.hudi.storage.inline.InLineFSUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,15 +183,6 @@ protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> sorte
}
}

/**
* Print the record in json format
*/
private void printRecord(String msg, byte[] bs, Schema schema) throws IOException {
GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
byte[] json = HoodieAvroUtils.avroToJson(record, true);
LOG.error(String.format("%s: %s", msg, new String(json)));
}

private HoodieConfig getHFileReaderConfig(boolean useNativeHFileReader) {
HoodieConfig config = new HoodieConfig();
config.setValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(List<
}

@Override
public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String> recordKeys) {
public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> recordKeys) {
throw new HoodieMetadataException("Unsupported operation: readRecordIndex!");
}

@Override
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
public Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
return Collections.emptyMap();
}

Expand All @@ -111,11 +111,6 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(L
throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
}

@Override
public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> getAllRecordsByKeys(List<String> keys, String partitionName) {
return Collections.emptyMap();
}

@Override
public Option<String> getSyncedInstantTime() {
throw new HoodieMetadataException("Unsupported operation: getSyncedInstantTime!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -287,7 +287,7 @@ public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final
* @param recordKeys The list of record keys to read
*/
@Override
public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String> recordKeys) {
public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> recordKeys) {
// If record index is not initialized yet, we cannot return an empty result here unlike the code for reading from other
// indexes. This is because results from this function are used for upserts and returning an empty result here would lead
// to existing records being inserted again causing duplicates.
Expand All @@ -296,15 +296,13 @@ public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String
"Record index is not initialized in MDT");

HoodieTimer timer = HoodieTimer.start();
Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result = getAllRecordsByKeys(recordKeys, MetadataPartitionType.RECORD_INDEX.getPartitionPath());
Map<String, List<HoodieRecordGlobalLocation>> recordKeyToLocation = new HashMap<>(result.size());
result.forEach((key, records) -> records.forEach(record -> {
Map<String, HoodieRecord<HoodieMetadataPayload>> result = getRecordsByKeys(recordKeys, MetadataPartitionType.RECORD_INDEX.getPartitionPath());
Map<String, HoodieRecordGlobalLocation> recordKeyToLocation = new HashMap<>(result.size());
result.forEach((key, record) -> {
if (!record.getData().isDeleted()) {
List<HoodieRecordGlobalLocation> locations = recordKeyToLocation.getOrDefault(key, new ArrayList<>());
locations.add(record.getData().getRecordGlobalLocation());
recordKeyToLocation.put(key, locations);
recordKeyToLocation.put(key, record.getData().getRecordGlobalLocation());
}
}));
});

metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_TIME_STR, timer.endTimer()));
metrics.ifPresent(m -> m.setMetric(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_KEYS_COUNT_STR, recordKeys.size()));
Expand All @@ -321,7 +319,7 @@ public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String
* @param secondaryKeys The list of secondary keys to read
*/
@Override
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
public Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
"Record index is not initialized in MDT");
ValidationUtils.checkState(
Expand Down Expand Up @@ -430,7 +428,7 @@ Map<String, List<StoragePathInfo>> fetchAllFilesInPartitionPaths(List<StoragePat
}

/**
* Handle spurious deletes. Depending on config, throw an exception or log warn msg.
* Handle spurious deletes. Depending on config, throw an exception or log a warn msg.
*/
private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, String partitionName) {
if (!metadataPayload.getDeletions().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,12 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(L
}

@Override
public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> getAllRecordsByKeys(List<String> keys, String partitionName) {
throw new HoodieMetadataException("Unsupported operation: getAllRecordsByKeys!");
}

@Override
public Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String> recordKeys) {
public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> recordKeys) {
throw new HoodieMetadataException("Unsupported operation: readRecordIndex!");
}

@Override
public Map<String, List<HoodieRecordGlobalLocation>> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
public Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
throw new HoodieMetadataException("Unsupported operation: readSecondaryIndex!");
}

Expand Down
Loading
Loading