From a7f8e2c00e92f8f6750a049ff3466f3742480a2b Mon Sep 17 00:00:00 2001 From: Carlos Peliciari Date: Sat, 15 Feb 2025 22:18:35 -0300 Subject: [PATCH] Create ConnIface interface and generate mock with mockgen - Created the ConnIface interface aggregating all public methods of *Conn. - Generated a mock implementation for ConnIface using mockgen for easier testing. --- conn.go | 84 ++++++++ generate.go | 3 + go.mod | 16 +- go.sum | 33 +--- mocks/mock_conn.go | 463 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 571 insertions(+), 28 deletions(-) create mode 100644 generate.go create mode 100644 mocks/mock_conn.go diff --git a/conn.go b/conn.go index 2b51afbd5..60d5f0198 100644 --- a/conn.go +++ b/conn.go @@ -1643,3 +1643,87 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen)) return resp, err } + +// ConnIface is an interface that aggregates all the public methods of the *Conn type. +type ConnIface interface { + // Broker returns a Broker representing the Kafka broker this connection was established with. + Broker() Broker + + // Controller requests the current controller from Kafka and returns its Broker. + Controller() (Broker, error) + + // Brokers retrieves the list of brokers from Kafka metadata. + Brokers() ([]Broker, error) + + // DeleteTopics deletes the specified topics. + DeleteTopics(topics ...string) error + + // Close closes the connection to Kafka. + Close() error + + // LocalAddr returns the local network address. + LocalAddr() net.Addr + + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr + + // SetDeadline sets the deadlines for both read and write operations. + SetDeadline(t time.Time) error + + // SetReadDeadline sets the deadline for future read operations. + SetReadDeadline(t time.Time) error + + // SetWriteDeadline sets the deadline for future write operations. + SetWriteDeadline(t time.Time) error + + // Offset returns the current offset and the reference point (whence). + Offset() (offset int64, whence int) + + // Seek adjusts the connection offset and returns the new absolute offset. + Seek(offset int64, whence int) (int64, error) + + // Read reads data from the connection, implementing the net.Conn interface. + Read(b []byte) (int, error) + + // ReadMessage reads a complete message starting from the current offset. + ReadMessage(maxBytes int) (Message, error) + + // ReadBatch reads a batch of messages based on the minBytes and maxBytes parameters. + ReadBatch(minBytes, maxBytes int) *Batch + + // ReadBatchWith reads a batch of messages using the provided configuration. + ReadBatchWith(cfg ReadBatchConfig) *Batch + + // ReadOffset returns the offset of the first message with a timestamp >= t. + ReadOffset(t time.Time) (int64, error) + + // ReadFirstOffset returns the first available offset. + ReadFirstOffset() (int64, error) + + // ReadLastOffset returns the last available offset. + ReadLastOffset() (int64, error) + + // ReadOffsets returns the absolute first and last offsets of the topic. + ReadOffsets() (first, last int64, err error) + + // ReadPartitions returns the list of partitions for the specified topics. + ReadPartitions(topics ...string) ([]Partition, error) + + // Write writes data to the connection. + Write(b []byte) (int, error) + + // WriteMessages writes a batch of messages to the connection. + WriteMessages(msgs ...Message) (int, error) + + // WriteCompressedMessages writes messages, potentially compressed. + WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (int, error) + + // WriteCompressedMessagesAt writes compressed messages and returns additional information. + WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) + + // SetRequiredAcks sets the number of acknowledgements required when producing messages. + SetRequiredAcks(n int) error + + // ApiVersions queries the API versions supported by the broker. + ApiVersions() ([]ApiVersion, error) +} diff --git a/generate.go b/generate.go new file mode 100644 index 000000000..f774f63f4 --- /dev/null +++ b/generate.go @@ -0,0 +1,3 @@ +package kafka + +//go:generate mockgen -source=../conn.go -destination=mocks -package=mocks ConnIface diff --git a/go.mod b/go.mod index d16e1ae78..3c1624fdd 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,23 @@ module github.com/segmentio/kafka-go -go 1.15 +go 1.22 require ( github.com/klauspost/compress v1.15.9 github.com/pierrec/lz4/v4 v4.1.15 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.9.0 github.com/xdg-go/scram v1.1.2 - golang.org/x/net v0.17.0 + go.uber.org/mock v0.5.0 + golang.org/x/net v0.26.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + golang.org/x/text v0.16.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) retract [v0.4.36, v0.4.37] diff --git a/go.sum b/go.sum index 440b00f6d..81471d424 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= @@ -7,11 +6,8 @@ github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0 github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -19,49 +15,36 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= +go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mocks/mock_conn.go b/mocks/mock_conn.go new file mode 100644 index 000000000..03156bc85 --- /dev/null +++ b/mocks/mock_conn.go @@ -0,0 +1,463 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: conn.go +// +// Generated by this command: +// +// mockgen -source=conn.go -destination=mocks/mock_conn.go -package=mocks ConnIface +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + net "net" + reflect "reflect" + time "time" + + kafka_go "github.com/segmentio/kafka-go" + gomock "go.uber.org/mock/gomock" +) + +// MockConnIface is a mock of ConnIface interface. +type MockConnIface struct { + ctrl *gomock.Controller + recorder *MockConnIfaceMockRecorder + isgomock struct{} +} + +// MockConnIfaceMockRecorder is the mock recorder for MockConnIface. +type MockConnIfaceMockRecorder struct { + mock *MockConnIface +} + +// NewMockConnIface creates a new mock instance. +func NewMockConnIface(ctrl *gomock.Controller) *MockConnIface { + mock := &MockConnIface{ctrl: ctrl} + mock.recorder = &MockConnIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConnIface) EXPECT() *MockConnIfaceMockRecorder { + return m.recorder +} + +// ApiVersions mocks base method. +func (m *MockConnIface) ApiVersions() ([]kafka_go.ApiVersion, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ApiVersions") + ret0, _ := ret[0].([]kafka_go.ApiVersion) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ApiVersions indicates an expected call of ApiVersions. +func (mr *MockConnIfaceMockRecorder) ApiVersions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ApiVersions", reflect.TypeOf((*MockConnIface)(nil).ApiVersions)) +} + +// Broker mocks base method. +func (m *MockConnIface) Broker() kafka_go.Broker { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Broker") + ret0, _ := ret[0].(kafka_go.Broker) + return ret0 +} + +// Broker indicates an expected call of Broker. +func (mr *MockConnIfaceMockRecorder) Broker() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broker", reflect.TypeOf((*MockConnIface)(nil).Broker)) +} + +// Brokers mocks base method. +func (m *MockConnIface) Brokers() ([]kafka_go.Broker, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Brokers") + ret0, _ := ret[0].([]kafka_go.Broker) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Brokers indicates an expected call of Brokers. +func (mr *MockConnIfaceMockRecorder) Brokers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Brokers", reflect.TypeOf((*MockConnIface)(nil).Brokers)) +} + +// Close mocks base method. +func (m *MockConnIface) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockConnIfaceMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockConnIface)(nil).Close)) +} + +// Controller mocks base method. +func (m *MockConnIface) Controller() (kafka_go.Broker, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Controller") + ret0, _ := ret[0].(kafka_go.Broker) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Controller indicates an expected call of Controller. +func (mr *MockConnIfaceMockRecorder) Controller() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Controller", reflect.TypeOf((*MockConnIface)(nil).Controller)) +} + +// DeleteTopics mocks base method. +func (m *MockConnIface) DeleteTopics(topics ...string) error { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range topics { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteTopics", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTopics indicates an expected call of DeleteTopics. +func (mr *MockConnIfaceMockRecorder) DeleteTopics(topics ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopics", reflect.TypeOf((*MockConnIface)(nil).DeleteTopics), topics...) +} + +// LocalAddr mocks base method. +func (m *MockConnIface) LocalAddr() net.Addr { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LocalAddr") + ret0, _ := ret[0].(net.Addr) + return ret0 +} + +// LocalAddr indicates an expected call of LocalAddr. +func (mr *MockConnIfaceMockRecorder) LocalAddr() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddr", reflect.TypeOf((*MockConnIface)(nil).LocalAddr)) +} + +// Offset mocks base method. +func (m *MockConnIface) Offset() (int64, int) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Offset") + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(int) + return ret0, ret1 +} + +// Offset indicates an expected call of Offset. +func (mr *MockConnIfaceMockRecorder) Offset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Offset", reflect.TypeOf((*MockConnIface)(nil).Offset)) +} + +// Read mocks base method. +func (m *MockConnIface) Read(b []byte) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read", b) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Read indicates an expected call of Read. +func (mr *MockConnIfaceMockRecorder) Read(b any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockConnIface)(nil).Read), b) +} + +// ReadBatch mocks base method. +func (m *MockConnIface) ReadBatch(minBytes, maxBytes int) *kafka_go.Batch { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadBatch", minBytes, maxBytes) + ret0, _ := ret[0].(*kafka_go.Batch) + return ret0 +} + +// ReadBatch indicates an expected call of ReadBatch. +func (mr *MockConnIfaceMockRecorder) ReadBatch(minBytes, maxBytes any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadBatch", reflect.TypeOf((*MockConnIface)(nil).ReadBatch), minBytes, maxBytes) +} + +// ReadBatchWith mocks base method. +func (m *MockConnIface) ReadBatchWith(cfg kafka_go.ReadBatchConfig) *kafka_go.Batch { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadBatchWith", cfg) + ret0, _ := ret[0].(*kafka_go.Batch) + return ret0 +} + +// ReadBatchWith indicates an expected call of ReadBatchWith. +func (mr *MockConnIfaceMockRecorder) ReadBatchWith(cfg any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadBatchWith", reflect.TypeOf((*MockConnIface)(nil).ReadBatchWith), cfg) +} + +// ReadFirstOffset mocks base method. +func (m *MockConnIface) ReadFirstOffset() (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadFirstOffset") + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadFirstOffset indicates an expected call of ReadFirstOffset. +func (mr *MockConnIfaceMockRecorder) ReadFirstOffset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadFirstOffset", reflect.TypeOf((*MockConnIface)(nil).ReadFirstOffset)) +} + +// ReadLastOffset mocks base method. +func (m *MockConnIface) ReadLastOffset() (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadLastOffset") + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadLastOffset indicates an expected call of ReadLastOffset. +func (mr *MockConnIfaceMockRecorder) ReadLastOffset() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadLastOffset", reflect.TypeOf((*MockConnIface)(nil).ReadLastOffset)) +} + +// ReadMessage mocks base method. +func (m *MockConnIface) ReadMessage(maxBytes int) (kafka_go.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadMessage", maxBytes) + ret0, _ := ret[0].(kafka_go.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadMessage indicates an expected call of ReadMessage. +func (mr *MockConnIfaceMockRecorder) ReadMessage(maxBytes any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadMessage", reflect.TypeOf((*MockConnIface)(nil).ReadMessage), maxBytes) +} + +// ReadOffset mocks base method. +func (m *MockConnIface) ReadOffset(t time.Time) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadOffset", t) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadOffset indicates an expected call of ReadOffset. +func (mr *MockConnIfaceMockRecorder) ReadOffset(t any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadOffset", reflect.TypeOf((*MockConnIface)(nil).ReadOffset), t) +} + +// ReadOffsets mocks base method. +func (m *MockConnIface) ReadOffsets() (int64, int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadOffsets") + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(int64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ReadOffsets indicates an expected call of ReadOffsets. +func (mr *MockConnIfaceMockRecorder) ReadOffsets() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadOffsets", reflect.TypeOf((*MockConnIface)(nil).ReadOffsets)) +} + +// ReadPartitions mocks base method. +func (m *MockConnIface) ReadPartitions(topics ...string) ([]kafka_go.Partition, error) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range topics { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReadPartitions", varargs...) + ret0, _ := ret[0].([]kafka_go.Partition) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadPartitions indicates an expected call of ReadPartitions. +func (mr *MockConnIfaceMockRecorder) ReadPartitions(topics ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadPartitions", reflect.TypeOf((*MockConnIface)(nil).ReadPartitions), topics...) +} + +// RemoteAddr mocks base method. +func (m *MockConnIface) RemoteAddr() net.Addr { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoteAddr") + ret0, _ := ret[0].(net.Addr) + return ret0 +} + +// RemoteAddr indicates an expected call of RemoteAddr. +func (mr *MockConnIfaceMockRecorder) RemoteAddr() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoteAddr", reflect.TypeOf((*MockConnIface)(nil).RemoteAddr)) +} + +// Seek mocks base method. +func (m *MockConnIface) Seek(offset int64, whence int) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Seek", offset, whence) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Seek indicates an expected call of Seek. +func (mr *MockConnIfaceMockRecorder) Seek(offset, whence any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Seek", reflect.TypeOf((*MockConnIface)(nil).Seek), offset, whence) +} + +// SetDeadline mocks base method. +func (m *MockConnIface) SetDeadline(t time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetDeadline", t) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetDeadline indicates an expected call of SetDeadline. +func (mr *MockConnIfaceMockRecorder) SetDeadline(t any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetDeadline", reflect.TypeOf((*MockConnIface)(nil).SetDeadline), t) +} + +// SetReadDeadline mocks base method. +func (m *MockConnIface) SetReadDeadline(t time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetReadDeadline", t) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetReadDeadline indicates an expected call of SetReadDeadline. +func (mr *MockConnIfaceMockRecorder) SetReadDeadline(t any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReadDeadline", reflect.TypeOf((*MockConnIface)(nil).SetReadDeadline), t) +} + +// SetRequiredAcks mocks base method. +func (m *MockConnIface) SetRequiredAcks(n int) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetRequiredAcks", n) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetRequiredAcks indicates an expected call of SetRequiredAcks. +func (mr *MockConnIfaceMockRecorder) SetRequiredAcks(n any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRequiredAcks", reflect.TypeOf((*MockConnIface)(nil).SetRequiredAcks), n) +} + +// SetWriteDeadline mocks base method. +func (m *MockConnIface) SetWriteDeadline(t time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWriteDeadline", t) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetWriteDeadline indicates an expected call of SetWriteDeadline. +func (mr *MockConnIfaceMockRecorder) SetWriteDeadline(t any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteDeadline", reflect.TypeOf((*MockConnIface)(nil).SetWriteDeadline), t) +} + +// Write mocks base method. +func (m *MockConnIface) Write(b []byte) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Write", b) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Write indicates an expected call of Write. +func (mr *MockConnIfaceMockRecorder) Write(b any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockConnIface)(nil).Write), b) +} + +// WriteCompressedMessages mocks base method. +func (m *MockConnIface) WriteCompressedMessages(codec kafka_go.CompressionCodec, msgs ...kafka_go.Message) (int, error) { + m.ctrl.T.Helper() + varargs := []any{codec} + for _, a := range msgs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "WriteCompressedMessages", varargs...) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WriteCompressedMessages indicates an expected call of WriteCompressedMessages. +func (mr *MockConnIfaceMockRecorder) WriteCompressedMessages(codec any, msgs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{codec}, msgs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteCompressedMessages", reflect.TypeOf((*MockConnIface)(nil).WriteCompressedMessages), varargs...) +} + +// WriteCompressedMessagesAt mocks base method. +func (m *MockConnIface) WriteCompressedMessagesAt(codec kafka_go.CompressionCodec, msgs ...kafka_go.Message) (int, int32, int64, time.Time, error) { + m.ctrl.T.Helper() + varargs := []any{codec} + for _, a := range msgs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "WriteCompressedMessagesAt", varargs...) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int32) + ret2, _ := ret[2].(int64) + ret3, _ := ret[3].(time.Time) + ret4, _ := ret[4].(error) + return ret0, ret1, ret2, ret3, ret4 +} + +// WriteCompressedMessagesAt indicates an expected call of WriteCompressedMessagesAt. +func (mr *MockConnIfaceMockRecorder) WriteCompressedMessagesAt(codec any, msgs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{codec}, msgs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteCompressedMessagesAt", reflect.TypeOf((*MockConnIface)(nil).WriteCompressedMessagesAt), varargs...) +} + +// WriteMessages mocks base method. +func (m *MockConnIface) WriteMessages(msgs ...kafka_go.Message) (int, error) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range msgs { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "WriteMessages", varargs...) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WriteMessages indicates an expected call of WriteMessages. +func (mr *MockConnIfaceMockRecorder) WriteMessages(msgs ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteMessages", reflect.TypeOf((*MockConnIface)(nil).WriteMessages), msgs...) +}