Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: replace Kafka client #1001

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,11 @@ GITHASH := $(shell git rev-parse HEAD)
GITBRANCH := $(shell git rev-parse --abbrev-ref HEAD)
GOVERSION := $(shell go version)

# Since TiDB add a new dependency on github.com/cloudfoundry/gosigar,
# We need to add CGO_ENABLED=1 to make it work when build TiCDC in Darwin OS.
# These logic is to check if the OS is Darwin, if so, add CGO_ENABLED=1.
# ref: https://github.com/cloudfoundry/gosigar/issues/58#issuecomment-1150925711
# ref: https://github.com/pingcap/tidb/pull/39526#issuecomment-1407952955
OS := "$(shell go env GOOS)"
SED_IN_PLACE ?= $(shell which sed)
IS_ALPINE := $(shell if [ -f /etc/os-release ]; then grep -qi Alpine /etc/os-release && echo 1; else echo 0; fi)
ifeq (${OS}, "linux")
CGO := 0
SED_IN_PLACE += -i
else ifeq (${OS}, "darwin")
CGO := 1
SED_IN_PLACE += -i ''
endif

GOTEST := CGO_ENABLED=1 $(GO) test -p 3 --race --tags=intest

BUILD_FLAG =
GOEXPERIMENT=
ifeq ("${ENABLE_FIPS}", "1")
BUILD_FLAG = -tags boringcrypto
GOEXPERIMENT = GOEXPERIMENT=boringcrypto
CGO = 1
endif

RELEASE_VERSION =
Expand All @@ -85,12 +66,23 @@ LDFLAGS += -X "$(TIFLOW_CDC_PKG)/pkg/version.GitHash=$(GITHASH)"
LDFLAGS += -X "$(TIFLOW_CDC_PKG)/pkg/version.GitBranch=$(GITBRANCH)"
LDFLAGS += -X "$(TIFLOW_CDC_PKG)/pkg/version.BuildTS=$(BUILDTS)"

CONSUMER_BUILD_FLAG=
ifeq ("${IS_ALPINE}", "1")
CONSUMER_BUILD_FLAG = -tags musl
OS := "$(shell go env GOOS)"
SED_IN_PLACE ?= $(shell which sed)
IS_ALPINE := $(shell if [ -f /etc/os-release ]; then grep -qi Alpine /etc/os-release && echo 1; else echo 0; fi)
ifeq (${OS}, "linux")
SED_IN_PLACE += -i
ifeq ($(static), true)
BUILD_FLAG += -tags musl
LDFLAGS += -linkmode external -extldflags "-static"
else ifeq ("${IS_ALPINE}", "1")
BUILD_FLAG += -tags musl
endif
else ifeq (${OS}, "darwin")
SED_IN_PLACE += -i ''
endif
GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=$(CGO) $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
CONSUMER_GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(CONSUMER_BUILD_FLAG) -trimpath $(GOVENDORFLAG)

GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3 --race --tags=intest

PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|ticdc/tests|integration|testing_utils|pb|pbmock|ticdc/bin'
PACKAGES := $$($(PACKAGE_LIST))
Expand Down Expand Up @@ -132,7 +124,7 @@ cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc

kafka_consumer:
$(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer

storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go
Expand Down
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,30 @@ TiCDC can be built on the following operating systems:
* Linux
* MacOS

Install GoLang 1.23.2

Install GoLang 1.23.2

For static build on Linux, you need to install following dependencies:
* [musl libc](https://musl.cc/#binaries)
* zlib
* openssl
* zstd
* curl
* sasl

```bash
# Linux
wget https://go.dev/dl/go1.23.2.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.23.2.linux-amd64.tar.gz
# static build on Linux
# wget https://musl.cc/${arch}-${os}-musl-native.tgz
# sudo tar -C /usr/local -xzf https://musl.cc/${arch}-${os}-musl-native.tgz
# export CC=/usr/local/${arch}-${os}-musl-native/bin/${arch}-${os}-musl-gcc

# MacOS
curl -O https://go.dev/dl/go1.23.2.darwin-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.23.2.darwin-amd64.tar.gz


export PATH=$PATH:/usr/local/go/bin
export GOPATH=$HOME/go
export PATH=$PATH:$GOPATH/bin
Expand All @@ -78,8 +90,9 @@ cd ticdc

2. Build TiCDC

If the build is not supported on your platform or if you need to replicate to a Kafka cluster with GSSAPI support, you should use a static build.
```bash
make cdc
make cdc # or `make cdc static=true`

# Generate the patchable tar file
cd bin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ import (
func TestCreateTopic(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
options := kafka.NewOptions()
changefeed := common.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)

adminClient, err := factory.AdminClient()
require.NoError(t, err)
defer adminClient.Close()
cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

changefeedID := common.NewChangefeedID4Test("test", "test")
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
manager := newKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
Expand Down Expand Up @@ -80,23 +85,27 @@ func TestCreateTopic(t *testing.T) {
func TestCreateTopicWithDelay(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
options := kafka.NewOptions()
changefeed := common.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)

adminClient, err := factory.AdminClient()
require.NoError(t, err)
defer adminClient.Close()
cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

ctx := context.Background()
topic := "new_topic"
changefeedID := common.NewChangefeedID4Test("test", "test")
ctx := context.Background()
manager := newKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, topic)
require.NoError(t, err)
err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3)
require.NoError(t, err)
err = manager.waitUntilTopicVisible(ctx, topic)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
Expand Down
11 changes: 6 additions & 5 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newKafkaSink(
}()

statistics := metrics.NewStatistics(changefeedID, "KafkaSink")
asyncProducer, err := kafkaComponent.Factory.AsyncProducer(ctx)
asyncProducer, err := kafkaComponent.Factory.AsyncProducer()
if err != nil {
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Expand Down Expand Up @@ -121,8 +121,8 @@ func newKafkaSink(
adminClient: kafkaComponent.AdminClient,
topicManager: kafkaComponent.TopicManager,
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(),
ctx: ctx,
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
}
return sink, nil
}
Expand Down Expand Up @@ -203,10 +203,11 @@ func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProduc
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down Expand Up @@ -256,7 +257,7 @@ func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProduc
adminClient: kafkaComponent.AdminClient,
topicManager: kafkaComponent.TopicManager,
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
metricsCollector: kafkaComponent.Factory.MetricsCollector(),
}
go sink.Run(ctx)
return sink, dmlMockProducer, ddlMockProducer, nil
Expand Down
12 changes: 5 additions & 7 deletions downstreamadapter/worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/ticdc/pkg/sink/codec"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/kafka"
v2 "github.com/pingcap/ticdc/pkg/sink/kafka/v2"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tiflow/pkg/sink"
Expand Down Expand Up @@ -60,7 +59,7 @@ func getKafkaSinkComponentWithFactory(ctx context.Context,
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaInvalidConfig, err)
}

kafkaComponent.Factory, err = factoryCreator(ctx, options, changefeedID)
kafkaComponent.Factory, err = factoryCreator(options, changefeedID)
if err != nil {
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Expand Down Expand Up @@ -94,7 +93,9 @@ func getKafkaSinkComponentWithFactory(ctx context.Context,
options.DeriveTopicConfig(),
kafkaComponent.AdminClient,
)

if err != nil {
return kafkaComponent, protocol, errors.Trace(err)
}
scheme := sink.GetScheme(sinkURI)
kafkaComponent.EventRouter, err = eventrouter.NewEventRouter(sinkConfig, protocol, topic, scheme)
if err != nil {
Expand Down Expand Up @@ -129,10 +130,7 @@ func GetKafkaSinkComponent(
sinkURI *url.URL,
sinkConfig *config.SinkConfig,
) (KafkaComponent, config.Protocol, error) {
factoryCreator := kafka.NewSaramaFactory
if utils.GetOrZero(sinkConfig.EnableKafkaSinkV2) {
factoryCreator = v2.NewFactory
}
factoryCreator := kafka.NewFactory
return getKafkaSinkComponentWithFactory(ctx, changefeedID, sinkURI, sinkConfig, factoryCreator)
}

Expand Down
5 changes: 3 additions & 2 deletions downstreamadapter/worker/kafka_ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func kafkaDDLWorkerForTest(t *testing.T) *KafkaDDLWorker {
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down
5 changes: 3 additions & 2 deletions downstreamadapter/worker/kafka_dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ func kafkaDMLWorkerForTest(t *testing.T) *KafkaDMLWorker {
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down
27 changes: 10 additions & 17 deletions downstreamadapter/worker/producer/kafka_ddl_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package producer

import (
"context"
"fmt"
"testing"
"time"

"github.com/IBM/sarama"
confluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -33,18 +34,14 @@ func TestDDLSyncBroadcastMessage(t *testing.T) {
options.MaxMessages = 1

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)

for i := 0; i < kafka.DefaultMockPartitionNum; i++ {
syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndSucceed()
}
err = p.SyncBroadcastMessage(ctx, kafka.DefaultMockTopicName,
kafka.DefaultMockPartitionNum, &common.Message{})
require.NoError(t, err)
Expand All @@ -61,16 +58,14 @@ func TestDDLSyncSendMessage(t *testing.T) {
options := getOptions()

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)

syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndSucceed()
err = p.SyncSendMessage(ctx, kafka.DefaultMockTopicName, 0, &common.Message{})
require.NoError(t, err)

Expand All @@ -85,25 +80,24 @@ func TestDDLProducerSendMsgFailed(t *testing.T) {
defer cancel()
options := getOptions()
options.MaxMessages = 1
options.MaxMessageBytes = 1
options.MaxMessageBytes = 1000

ctx = context.WithValue(ctx, "testing.T", t)

// This will make the first send failed.
changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)
defer p.Close()

syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndFail(sarama.ErrMessageTooLarge)
err = p.SyncSendMessage(ctx, kafka.DefaultMockTopicName, 0, &common.Message{})
require.ErrorIs(t, err, sarama.ErrMessageTooLarge)
err = p.SyncSendMessage(ctx, kafka.DefaultMockTopicName, 0, &common.Message{Value: make([]byte, 1000)})
fmt.Println(err)
require.ErrorIs(t, err, confluentKafka.NewError(confluentKafka.ErrMsgSizeTooLarge, "", false))
}

func TestDDLProducerDoubleClose(t *testing.T) {
Expand All @@ -112,9 +106,8 @@ func TestDDLProducerDoubleClose(t *testing.T) {
options := getOptions()

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)
Expand Down
Loading
Loading