Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Commit bbe9329

Browse files
committed
backport #188 to v0.2
1 parent e14311b commit bbe9329

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

src/Consumer/Process.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,21 +557,23 @@ public function succFetch($result, $fd)
557557
continue;
558558
}
559559

560-
$offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
561-
if ($offset === false) {
560+
$consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
561+
if ($consumerOffset === false) {
562562
return; // current is rejoin....
563563
}
564564
foreach ($part['messages'] as $message) {
565565
$this->messages[$topic['topicName']][$part['partition']][] = $message;
566566
//if ($this->consumer != null) {
567567
// call_user_func($this->consumer, $topic['topicName'], $part['partition'], $message);
568568
//}
569-
$offset = $message['offset'];
569+
$commitOffset = $message['offset'];
570570
}
571571

572-
$consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset;
572+
$commitOffset = isset($commitOffset) ? $commitOffset : $consumerOffset - 1;
573+
$consumerOffset = $commitOffset + 1;
574+
573575
$assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset);
574-
$assign->setCommitOffset($topic['topicName'], $part['partition'], $offset);
576+
$assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset);
575577
}
576578
}
577579
$this->state->succRun(\Kafka\Consumer\State::REQUEST_FETCH, $fd);

0 commit comments

Comments
 (0)