diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index b8cd5b77..37f49dee 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -644,21 +644,22 @@ public function succFetch(array $result, int $fd): void continue; } - $offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); - - if ($offset === null) { + $consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']); + if ($consumerOffset === null) { return; // current is rejoin.... } + $commitOffset = $consumerOffset - 1; foreach ($part['messages'] as $message) { $this->messages[$topic['topicName']][$part['partition']][] = $message; - $offset = $message['offset']; + $commitOffset = $message['offset']; } - $consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset; + $consumerOffset = $commitOffset + 1; + $assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset); - $assign->setCommitOffset($topic['topicName'], $part['partition'], $offset); + $assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset); } }