Skip to content

Commit ca87224

Browse files
authored
Merge pull request #66 from fearlessfei/master
feat: consumer close log
2 parents 30ce25f + 3b89753 commit ca87224

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

Diff for: kq/queue.go

+4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ func (q *kafkaQueue) Start() {
191191
q.producerRoutines.Wait()
192192
close(q.channel)
193193
q.consumerRoutines.Wait()
194+
195+
logx.Infof("Consumer %s is closed", q.c.Name)
194196
}
195197
}
196198

@@ -239,10 +241,12 @@ func (q *kafkaQueue) startConsumers() {
239241

240242
func (q *kafkaQueue) startProducers() {
241243
for i := 0; i < q.c.Consumers; i++ {
244+
i := i
242245
q.producerRoutines.Run(func() {
243246
if err := q.consume(func(msg kafka.Message) {
244247
q.channel <- msg
245248
}); err != nil {
249+
logx.Infof("Consumer %s-%d is closed, error: %q", q.c.Name, i, err.Error())
246250
return
247251
}
248252
})

0 commit comments

Comments
 (0)