Skip to content

feat: expose information about rebalance events in th reader #1331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions consumergroup.go
Original file line number Diff line number Diff line change
@@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
@@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
6 changes: 3 additions & 3 deletions example_groupbalancer_test.go
Original file line number Diff line number Diff line change
@@ -31,9 +31,9 @@ func ExampleNewReader_rackAffinity() {
}

// findRack is the basic rack resolver strategy for use in AWS. It supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
// - ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// - Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
22 changes: 12 additions & 10 deletions groupbalancer.go
Original file line number Diff line number Diff line change
@@ -41,14 +41,15 @@ type GroupBalancer interface {
// RangeGroupBalancer groups consumers by partition
//
// Example: 5 partitions, 2 consumers
// C0: [0, 1, 2]
// C1: [3, 4]
//
// C0: [0, 1, 2]
// C1: [3, 4]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
//
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
type RangeGroupBalancer struct{}

func (r RangeGroupBalancer) ProtocolName() string {
@@ -92,14 +93,15 @@ func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions
// RoundrobinGroupBalancer divides partitions evenly among consumers
//
// Example: 5 partitions, 2 consumers
// C0: [0, 2, 4]
// C1: [1, 3]
//
// C0: [0, 2, 4]
// C1: [1, 3]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
//
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
type RoundRobinGroupBalancer struct{}

func (r RoundRobinGroupBalancer) ProtocolName() string {
12 changes: 12 additions & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
@@ -189,3 +189,15 @@ func (l *testKafkaLogger) Printf(msg string, args ...interface{}) {
l.T.Logf(msg, args...)
}
}

type testRebalanceEventCallback struct {
NoticeChan chan map[string][]PartitionAssignment
}

func newTestRebalanceEventCallback(c chan map[string][]PartitionAssignment) RebalanceEventInterceptor {
return &testRebalanceEventCallback{NoticeChan: c}
}

func (c *testRebalanceEventCallback) Callback(partitionAssignments map[string][]PartitionAssignment) {
c.NoticeChan <- partitionAssignments
}
11 changes: 6 additions & 5 deletions logger.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,12 @@ type Logger interface {

// LoggerFunc is a bridge between Logger and any third party logger
// Usage:
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
//
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
type LoggerFunc func(string, ...interface{})

func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
13 changes: 13 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
@@ -331,6 +331,10 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.subscribe(gen.Assignments)

r.withRebalanceEventInterceptor(func(l RebalanceEventInterceptor) {
l.Callback(gen.Assignments)
})

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
@@ -522,6 +526,9 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// If not nil, specifies a callback usd to report rebalance events
RebalanceEventInterceptor RebalanceEventInterceptor
}

// Validate method validates ReaderConfig properties.
@@ -1142,6 +1149,12 @@ func (r *Reader) withErrorLogger(do func(Logger)) {
}
}

func (r *Reader) withRebalanceEventInterceptor(do func(RebalanceEventInterceptor)) {
if r.config.RebalanceEventInterceptor != nil {
do(r.config.RebalanceEventInterceptor)
}
}

func (r *Reader) activateReadLag() {
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
// read lag will only be calculated when not using consumer groups
46 changes: 46 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
@@ -395,6 +395,52 @@ func deleteTopic(t *testing.T, topic ...string) {
}
}

func TestReaderCollectsRebalanceEvents(t *testing.T) {
const GroupId = "a"
const Partitions = 5
topic := makeTopic()
createTopic(t, topic, Partitions)
defer deleteTopic(t, topic)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
c := make(chan map[string][]PartitionAssignment)
eventReceived := false
defer func() {
if !eventReceived {
t.Error("no rebalance event received")
}
}()
go func() {
firstAssignment := <-c
if len(firstAssignment) != 1 {
t.Error("multiple topics assigned")
}
info, ok := firstAssignment[topic]
if !ok {
t.Error("wrong topic assigned")
}
if len(info) != Partitions {
t.Error("wrong number of partitions assigned")
}
eventReceived = true
}()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: GroupId,
MinBytes: 1,
MaxBytes: 1000,
MaxWait: 100 * time.Millisecond,
RebalanceEventInterceptor: newTestRebalanceEventCallback(c),
})
defer r.Close()

prepareReader(t, ctx, r, makeTestSequence(1)...)
_, _ = r.ReadMessage(ctx)
}

func TestReaderOnNonZeroPartition(t *testing.T) {
tests := []struct {
scenario string
12 changes: 12 additions & 0 deletions rebalance_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kafka

// RebalanceEventInterceptor defines the rebalance event callback API
type RebalanceEventInterceptor interface {
Callback(map[string][]PartitionAssignment)
}

type RebalanceFunc func(map[string][]PartitionAssignment)

func (f RebalanceFunc) Callback(partitionAssignments map[string][]PartitionAssignment) {
f(partitionAssignments)
}