Skip to content

Commit 854369a

Browse files
authored
[ISSUE #9339] Fix pop update consumption offset when message are filtered (#9340)
1 parent 9adff6a commit 854369a

File tree

4 files changed

+17
-14
lines changed

4 files changed

+17
-14
lines changed

Diff for: broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,15 @@ public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult,
162162
return result;
163163
}
164164

165-
public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMessageResult result,
165+
public PopConsumerContext handleGetMessageResult(PopConsumerContext context, GetMessageResult result,
166166
String topicId, int queueId, PopConsumerRecord.RetryType retryType, long offset) {
167167

168-
if (result.getStatus() == GetMessageStatus.FOUND && !result.getMessageQueueOffset().isEmpty()) {
168+
if (GetMessageStatus.FOUND.equals(result.getStatus()) && !result.getMessageQueueOffset().isEmpty()) {
169169
if (context.isFifo()) {
170170
this.setFifoBlocked(context, context.getGroupId(), topicId, queueId, result.getMessageQueueOffset());
171171
}
172-
173-
// build request header here
172+
// build response header here
174173
context.addGetMessageResult(result, topicId, queueId, retryType, offset);
175-
176174
if (brokerConfig.isPopConsumerKVServiceLog()) {
177175
log.info("PopConsumerService pop, time={}, invisible={}, " +
178176
"groupId={}, topic={}, queueId={}, offset={}, attemptId={}",
@@ -181,20 +179,23 @@ public PopConsumerContext addGetMessageResult(PopConsumerContext context, GetMes
181179
}
182180
}
183181

184-
if (!context.isFifo() && result.getNextBeginOffset() > OFFSET_NOT_EXIST) {
182+
long commitOffset = offset;
183+
if (context.isFifo()) {
184+
if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
185+
commitOffset = result.getNextBeginOffset();
186+
}
187+
} else {
185188
this.brokerController.getConsumerOffsetManager().commitPullOffset(
186189
context.getClientHost(), context.getGroupId(), topicId, queueId, result.getNextBeginOffset());
187-
long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? offset : result.getNextBeginOffset();
188190
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null) {
189191
long minOffset = popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId);
190192
if (minOffset != OFFSET_NOT_EXIST) {
191193
commitOffset = minOffset;
192194
}
193195
}
194-
this.brokerController.getConsumerOffsetManager().commitOffset(
195-
context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset);
196196
}
197-
197+
this.brokerController.getConsumerOffsetManager().commitOffset(
198+
context.getClientHost(), context.getGroupId(), topicId, queueId, commitOffset);
198199
return context;
199200
}
200201

@@ -310,7 +311,7 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
310311
} else {
311312
final long consumeOffset = this.getPopOffset(groupId, topicId, queueId, result.getInitMode());
312313
return getMessageAsync(clientHost, groupId, topicId, queueId, consumeOffset, remain, filter)
313-
.thenApply(getMessageResult -> addGetMessageResult(
314+
.thenApply(getMessageResult -> handleGetMessageResult(
314315
result, getMessageResult, topicId, queueId, retryType, consumeOffset));
315316
}
316317
});

Diff for: broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,9 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
680680
CompletableFuture<Long> future = new CompletableFuture<>();
681681
if (!queueLockManager.tryLock(lockKey)) {
682682
try {
683-
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
683+
if (!requestHeader.isOrder()) {
684+
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
685+
}
684686
future.complete(restNum);
685687
} catch (ConsumeQueueException e) {
686688
future.completeExceptionally(e);

Diff for: broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void addGetMessageResultTest() {
195195
GetMessageResult result = new GetMessageResult();
196196
result.setStatus(GetMessageStatus.FOUND);
197197
result.getMessageQueueOffset().add(100L);
198-
consumerService.addGetMessageResult(
198+
consumerService.handleGetMessageResult(
199199
context, result, topicId, queueId, PopConsumerRecord.RetryType.NORMAL_TOPIC, 100);
200200
Assert.assertEquals(1, context.getGetMessageResultList().size());
201201
}

Diff for: store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
875875

876876
boolean isInMem = estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
877877

878-
if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() > maxFilterMessageSize) {
878+
if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() >= maxFilterMessageSize) {
879879
break;
880880
}
881881

0 commit comments

Comments
 (0)