diff --git a/Makefile b/Makefile index de97b01a9..2b11f4537 100644 --- a/Makefile +++ b/Makefile @@ -113,7 +113,7 @@ P=3 # The following packages are used in unit tests. # Add new packages here if you want to include them in unit tests. -UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/worker/... ./downstreamadapter/worker/writer/... ./pkg/sink/codec/avro/... ./pkg/sink/codec/open/... ./pkg/sink/codec/csv/... ./pkg/sink/codec/canal/... ./pkg/sink/codec/debezium/... ./pkg/sink/codec/simple/... +UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./pkg/sink/codec/avro/... ./pkg/sink/codec/open/... ./pkg/sink/codec/csv/... ./pkg/sink/codec/canal/... ./pkg/sink/codec/debezium/... ./pkg/sink/codec/simple/... UT_PACKAGES_MAINTAINER := ./maintainer/... UT_PACKAGES_COORDINATOR := ./coordinator/... UT_PACKAGES_LOGSERVICE := ./logservice/... diff --git a/downstreamadapter/sink/blackhole.go b/downstreamadapter/sink/blackhole/sink.go similarity index 63% rename from downstreamadapter/sink/blackhole.go rename to downstreamadapter/sink/blackhole/sink.go index e27067b88..6c7bfe553 100644 --- a/downstreamadapter/sink/blackhole.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sink +package blackhole import ( "context" @@ -23,27 +23,26 @@ import ( "go.uber.org/zap" ) -// BlackHoleSink is responsible for writing data to blackhole. +// sink is responsible for writing data to blackhole. // Including DDL and DML. -type BlackHoleSink struct{} +type sink struct{} -func newBlackHoleSink() (*BlackHoleSink, error) { - blackholeSink := BlackHoleSink{} - return &blackholeSink, nil +func New() (*sink, error) { + return &sink{}, nil } -func (s *BlackHoleSink) IsNormal() bool { +func (s *sink) IsNormal() bool { return true } -func (s *BlackHoleSink) SinkType() common.SinkType { +func (s *sink) SinkType() common.SinkType { return common.BlackHoleSinkType } -func (s *BlackHoleSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { +func (s *sink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { } -func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) { +func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23 log.Debug("BlackHoleSink: WriteEvents", zap.Any("dml", event)) @@ -52,37 +51,31 @@ func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) { } } -func (s *BlackHoleSink) WriteBlockEvent(event commonEvent.BlockEvent) error { +func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { switch event.GetType() { case commonEvent.TypeDDLEvent: e := event.(*commonEvent.DDLEvent) // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) - for _, callback := range e.PostTxnFlushed { - callback() - } case commonEvent.TypeSyncPointEvent: - e := event.(*commonEvent.SyncPointEvent) - for _, callback := range e.PostTxnFlushed { - callback() - } default: log.Error("unknown event type", zap.Any("event", event)) } + event.PostFlush() return nil } -func (s *BlackHoleSink) AddCheckpointTs(_ uint64) { +func (s *sink) AddCheckpointTs(_ uint64) { } -func (s *BlackHoleSink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) { +func (s *sink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) { return startTsList, make([]bool, len(startTsList)), nil } -func (s *BlackHoleSink) Close(_ bool) {} +func (s *sink) Close(_ bool) {} -func (s *BlackHoleSink) Run(_ context.Context) error { +func (s *sink) Run(_ context.Context) error { return nil } diff --git a/downstreamadapter/sink/blackhole_test.go b/downstreamadapter/sink/blackhole/sink_test.go similarity index 97% rename from downstreamadapter/sink/blackhole_test.go rename to downstreamadapter/sink/blackhole/sink_test.go index 82e76c820..0c1db635c 100644 --- a/downstreamadapter/sink/blackhole_test.go +++ b/downstreamadapter/sink/blackhole/sink_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sink +package blackhole import ( "testing" @@ -23,7 +23,7 @@ import ( // Test callback and tableProgress works as expected after AddDMLEvent func TestBlacHoleSinkBasicFunctionality(t *testing.T) { - sink, err := newBlackHoleSink() + sink, err := New() require.NoError(t, err) count := 0 diff --git a/downstreamadapter/sink/cloudstorage.go b/downstreamadapter/sink/cloudstorage.go deleted file mode 100644 index 42bc6f7f9..000000000 --- a/downstreamadapter/sink/cloudstorage.go +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "math" - "net/url" - "strings" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/downstreamadapter/sink/helper" - "github.com/pingcap/ticdc/downstreamadapter/worker" - "github.com/pingcap/ticdc/pkg/common" - commonType "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" - "github.com/pingcap/ticdc/pkg/sink/util" - putil "github.com/pingcap/ticdc/pkg/util" - "github.com/pingcap/tidb/br/pkg/storage" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -// It will send the events to cloud storage systems. -// Messages are encoded in the specific protocol and then sent to the defragmenter. -// The data flow is as follows: **data** -> encodingWorkers -> defragmenter -> dmlWorkers -> external storage -// The defragmenter will defragment the out-of-order encoded messages and sends encoded -// messages to individual dmlWorkers. -// The dmlWorkers will write the encoded messages to external storage in parallel between different tables. -type CloudStorageSink struct { - changefeedID commonType.ChangeFeedID - scheme string - outputRawChangeEvent bool - - // workers defines a group of workers for writing events to external storage. - dmlWorker *worker.CloudStorageDMLWorker - ddlWorker *worker.CloudStorageDDLWorker - - statistics *metrics.Statistics - - isNormal uint32 -} - -func verifyCloudStorageSink(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig) error { - var ( - protocol config.Protocol - storage storage.ExternalStorage - err error - ) - cfg := cloudstorage.NewConfig() - if err = cfg.Apply(ctx, sinkURI, sinkConfig); err != nil { - return err - } - if protocol, err = helper.GetProtocol(putil.GetOrZero(sinkConfig.Protocol)); err != nil { - return err - } - if _, err = util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt); err != nil { - return err - } - if storage, err = putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()); err != nil { - return err - } - storage.Close() - return nil -} - -func newCloudStorageSink( - ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig, - cleanupJobs []func(), /* only for test */ -) (*CloudStorageSink, error) { - // create cloud storage config and then apply the params of sinkURI to it. - cfg := cloudstorage.NewConfig() - err := cfg.Apply(ctx, sinkURI, sinkConfig) - if err != nil { - return nil, err - } - // fetch protocol from replicaConfig defined by changefeed config file. - protocol, err := helper.GetProtocol( - putil.GetOrZero(sinkConfig.Protocol), - ) - if err != nil { - return nil, errors.Trace(err) - } - // get cloud storage file extension according to the specific protocol. - ext := helper.GetFileExtension(protocol) - // the last param maxMsgBytes is mainly to limit the size of a single message for - // batch protocols in mq scenario. In cloud storage sink, we just set it to max int. - encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) - if err != nil { - return nil, errors.Trace(err) - } - storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) - if err != nil { - return nil, err - } - s := &CloudStorageSink{ - changefeedID: changefeedID, - scheme: strings.ToLower(sinkURI.Scheme), - outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(), - statistics: metrics.NewStatistics(changefeedID, "CloudStorageSink"), - } - - s.dmlWorker, err = worker.NewCloudStorageDMLWorker(changefeedID, storage, cfg, encoderConfig, ext, s.statistics) - if err != nil { - return nil, err - } - s.ddlWorker = worker.NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, cleanupJobs, storage, s.statistics) - return s, nil -} - -func (s *CloudStorageSink) SinkType() common.SinkType { - return common.CloudStorageSinkType -} - -func (s *CloudStorageSink) Run(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - - eg.Go(func() error { - return s.dmlWorker.Run(ctx) - }) - - eg.Go(func() error { - return s.ddlWorker.Run(ctx) - }) - - return eg.Wait() -} - -func (s *CloudStorageSink) IsNormal() bool { - return atomic.LoadUint32(&s.isNormal) == 1 -} - -func (s *CloudStorageSink) AddDMLEvent(event *commonEvent.DMLEvent) { - s.dmlWorker.AddDMLEvent(event) -} - -func (s *CloudStorageSink) WriteBlockEvent(event commonEvent.BlockEvent) error { - switch e := event.(type) { - case *commonEvent.DDLEvent: - if e.TiDBOnly { - // run callback directly and return - e.PostFlush() - return nil - } - err := s.ddlWorker.WriteBlockEvent(e) - if err != nil { - atomic.StoreUint32(&s.isNormal, 0) - return errors.Trace(err) - } - case *commonEvent.SyncPointEvent: - log.Error("CloudStorageSink doesn't support Sync Point Event", - zap.String("namespace", s.changefeedID.Namespace()), - zap.String("changefeed", s.changefeedID.Name()), - zap.Any("event", event)) - default: - log.Error("CloudStorageSink doesn't support this type of block event", - zap.String("namespace", s.changefeedID.Namespace()), - zap.String("changefeed", s.changefeedID.Name()), - zap.Any("eventType", event.GetType())) - } - return nil -} - -func (s *CloudStorageSink) AddCheckpointTs(ts uint64) { - s.ddlWorker.AddCheckpointTs(ts) -} - -func (s *CloudStorageSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { - s.ddlWorker.SetTableSchemaStore(tableSchemaStore) -} - -func (s *CloudStorageSink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) { - return startTsList, make([]bool, len(startTsList)), nil -} - -func (s *CloudStorageSink) Close(_ bool) { - s.dmlWorker.Close() - s.ddlWorker.Close() - if s.statistics != nil { - s.statistics.Close() - } -} diff --git a/downstreamadapter/worker/writer/defragmenter.go b/downstreamadapter/sink/cloudstorage/defragmenter.go similarity index 75% rename from downstreamadapter/worker/writer/defragmenter.go rename to downstreamadapter/sink/cloudstorage/defragmenter.go index ede568a8d..618140dcd 100644 --- a/downstreamadapter/worker/writer/defragmenter.go +++ b/downstreamadapter/sink/cloudstorage/defragmenter.go @@ -10,7 +10,8 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package writer + +package cloudstorage import ( "context" @@ -23,8 +24,8 @@ import ( "github.com/pingcap/tiflow/pkg/hash" ) -// EventFragment is used to attach a sequence number to TxnCallbackableEvent. -type EventFragment struct { +// eventFragment is used to attach a sequence number to TxnCallbackableEvent. +type eventFragment struct { event *commonEvent.DMLEvent versionedTable cloudstorage.VersionedTableName @@ -32,43 +33,43 @@ type EventFragment struct { // e.g. TxnCallbackableEvent 1~5 are dispatched to a group of encoding workers, but the // encoding completion time varies. Let's say the final completion sequence are 1,3,2,5,4, // we can use the sequence numbers to do defragmentation so that the events can arrive - // at dmlWorker sequentially. + // at dmlWriters sequentially. seqNumber uint64 // encodedMsgs denote the encoded messages after the event is handled in encodingWorker. encodedMsgs []*common.Message } -func NewEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) EventFragment { - return EventFragment{ +func newEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) eventFragment { + return eventFragment{ seqNumber: seq, versionedTable: version, event: event, } } -// Defragmenter is used to handle event fragments which can be registered +// defragmenter is used to handle event fragments which can be registered // out of order. -type Defragmenter struct { +type defragmenter struct { lastDispatchedSeq uint64 - future map[uint64]EventFragment - inputCh <-chan EventFragment - outputChs []*chann.DrainableChann[EventFragment] + future map[uint64]eventFragment + inputCh <-chan eventFragment + outputChs []*chann.DrainableChann[eventFragment] hasher *hash.PositionInertia } -func NewDefragmenter( - inputCh <-chan EventFragment, - outputChs []*chann.DrainableChann[EventFragment], -) *Defragmenter { - return &Defragmenter{ - future: make(map[uint64]EventFragment), +func newDefragmenter( + inputCh <-chan eventFragment, + outputChs []*chann.DrainableChann[eventFragment], +) *defragmenter { + return &defragmenter{ + future: make(map[uint64]eventFragment), inputCh: inputCh, outputChs: outputChs, hasher: hash.NewPositionInertia(), } } -func (d *Defragmenter) Run(ctx context.Context) error { +func (d *defragmenter) Run(ctx context.Context) error { defer d.close() for { select { @@ -92,9 +93,9 @@ func (d *Defragmenter) Run(ctx context.Context) error { } } -func (d *Defragmenter) writeMsgsConsecutive( +func (d *defragmenter) writeMsgsConsecutive( ctx context.Context, - start EventFragment, + start eventFragment, ) { d.dispatchFragToDMLWorker(start) @@ -115,7 +116,7 @@ func (d *Defragmenter) writeMsgsConsecutive( } } -func (d *Defragmenter) dispatchFragToDMLWorker(frag EventFragment) { +func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) { tableName := frag.versionedTable.TableNameWithPhysicTableID d.hasher.Reset() d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) @@ -124,7 +125,7 @@ func (d *Defragmenter) dispatchFragToDMLWorker(frag EventFragment) { d.lastDispatchedSeq = frag.seqNumber } -func (d *Defragmenter) close() { +func (d *defragmenter) close() { for _, ch := range d.outputChs { ch.CloseAndDrain() } diff --git a/downstreamadapter/worker/writer/defragmenter_test.go b/downstreamadapter/sink/cloudstorage/defragmenter_test.go similarity index 94% rename from downstreamadapter/worker/writer/defragmenter_test.go rename to downstreamadapter/sink/cloudstorage/defragmenter_test.go index 1c8755588..dadc2964c 100644 --- a/downstreamadapter/worker/writer/defragmenter_test.go +++ b/downstreamadapter/sink/cloudstorage/defragmenter_test.go @@ -10,7 +10,8 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package writer + +package cloudstorage import ( "context" @@ -42,9 +43,9 @@ func TestDeframenter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) eg, egCtx := errgroup.WithContext(ctx) - inputCh := make(chan EventFragment) - outputCh := chann.NewAutoDrainChann[EventFragment]() - defrag := NewDefragmenter(inputCh, []*chann.DrainableChann[EventFragment]{outputCh}) + inputCh := make(chan eventFragment) + outputCh := chann.NewAutoDrainChann[eventFragment]() + defrag := newDefragmenter(inputCh, []*chann.DrainableChann[eventFragment]{outputCh}) eg.Go(func() error { return defrag.Run(egCtx) }) @@ -80,7 +81,7 @@ func TestDeframenter(t *testing.T) { tableInfo := common.WrapTableInfo("test", tidbTableInfo) for i := 0; i < txnCnt; i++ { go func(seq uint64) { - frag := EventFragment{ + frag := eventFragment{ versionedTable: cloudstorage.VersionedTableName{ TableNameWithPhysicTableID: common.TableName{ Schema: "test", diff --git a/downstreamadapter/sink/cloudstorage/dml_writers.go b/downstreamadapter/sink/cloudstorage/dml_writers.go new file mode 100644 index 000000000..9f027a7dc --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/dml_writers.go @@ -0,0 +1,132 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "context" + "sync/atomic" + + commonType "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/sink/cloudstorage" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/utils/chann" + "github.com/pingcap/tidb/br/pkg/storage" + "golang.org/x/sync/errgroup" +) + +// dmlWriters denotes a worker responsible for writing messages to cloud storage. +type dmlWriters struct { + changefeedID commonType.ChangeFeedID + statistics *metrics.Statistics + + // msgCh is a channel to hold eventFragment. + // The caller of WriteEvents will write eventFragment to msgCh and + // the encodingWorkers will read eventFragment from msgCh to encode events. + msgCh *chann.DrainableChann[eventFragment] + encodeGroup *encodingGroup + + // defragmenter is used to defragment the out-of-order encoded messages and + // sends encoded messages to individual dmlWorkers. + defragmenter *defragmenter + + writers []*writer + + // last sequence number + lastSeqNum uint64 +} + +func newDMLWriters( + changefeedID commonType.ChangeFeedID, + storage storage.ExternalStorage, + config *cloudstorage.Config, + encoderConfig *common.Config, + extension string, + statistics *metrics.Statistics, +) *dmlWriters { + messageCh := chann.NewAutoDrainChann[eventFragment]() + encodedOutCh := make(chan eventFragment, defaultChannelSize) + encoderGroup := newEncodingGroup(changefeedID, encoderConfig, defaultEncodingConcurrency, messageCh.Out(), encodedOutCh) + + writers := make([]*writer, config.WorkerCount) + writerInputChs := make([]*chann.DrainableChann[eventFragment], config.WorkerCount) + for i := 0; i < config.WorkerCount; i++ { + inputCh := chann.NewAutoDrainChann[eventFragment]() + writerInputChs[i] = inputCh + writers[i] = newWriter(i, changefeedID, storage, config, extension, inputCh, statistics) + } + + return &dmlWriters{ + changefeedID: changefeedID, + statistics: statistics, + msgCh: messageCh, + + encodeGroup: encoderGroup, + defragmenter: newDefragmenter(encodedOutCh, writerInputChs), + writers: writers, + } +} + +func (d *dmlWriters) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + return d.encodeGroup.Run(ctx) + }) + + eg.Go(func() error { + return d.defragmenter.Run(ctx) + }) + + for i := 0; i < len(d.writers); i++ { + eg.Go(func() error { + return d.writers[i].Run(ctx) + }) + } + return eg.Wait() +} + +func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) { + if event.State != commonEvent.EventSenderStateNormal { + // The table where the event comes from is in stopping, so it's safe + // to drop the event directly. + event.PostFlush() + return + } + + tbl := cloudstorage.VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: event.TableInfo.GetSchemaName(), + Table: event.TableInfo.GetTableName(), + TableID: event.PhysicalTableID, + IsPartition: event.TableInfo.IsPartitionTable(), + }, + TableInfoVersion: event.TableInfoVersion, + } + seq := atomic.AddUint64(&d.lastSeqNum, 1) + _ = d.statistics.RecordBatchExecution(func() (int, int64, error) { + // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. + d.msgCh.In() <- newEventFragment(seq, tbl, event) + return int(event.Len()), event.GetRowsSize(), nil + }) +} + +func (d *dmlWriters) close() { + d.msgCh.CloseAndDrain() + d.encodeGroup.close() + for _, w := range d.writers { + w.close() + } +} diff --git a/downstreamadapter/worker/cloudstorage_dml_worker_test.go b/downstreamadapter/sink/cloudstorage/dml_writers_test.go similarity index 72% rename from downstreamadapter/worker/cloudstorage_dml_worker_test.go rename to downstreamadapter/sink/cloudstorage/dml_writers_test.go index 205d35cac..748b2f07f 100644 --- a/downstreamadapter/worker/cloudstorage_dml_worker_test.go +++ b/downstreamadapter/sink/cloudstorage/dml_writers_test.go @@ -10,12 +10,12 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package worker + +package cloudstorage import ( "context" "fmt" - "math" "net/url" "os" "path" @@ -24,37 +24,25 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/ticdc/downstreamadapter/sink/helper" - "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" - "github.com/pingcap/ticdc/pkg/sink/util" putil "github.com/pingcap/ticdc/pkg/util" pclock "github.com/pingcap/tiflow/engine/pkg/clock" - "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) -func setClock(s *CloudStorageDMLWorker, clock pclock.Clock) { - for _, w := range s.writers { - w.SetClock(pdutil.NewMonotonicClock(clock)) - } -} - func getTableFiles(t *testing.T, tableDir string) []string { files, err := os.ReadDir(tableDir) - require.Nil(t, err) + require.NoError(t, err) fileNames := []string{} for _, f := range files { fileName := f.Name() if f.IsDir() { metaFiles, err := os.ReadDir(path.Join(tableDir, f.Name())) - require.Nil(t, err) + require.NoError(t, err) require.Len(t, metaFiles, 1) fileName = metaFiles[0].Name() } @@ -63,59 +51,31 @@ func getTableFiles(t *testing.T, tableDir string) []string { return fileNames } -func newCloudStorageDMLWorkerForTest(parentDir string, flushInterval int, sinkConfig *config.SinkConfig) (*CloudStorageDMLWorker, error) { - ctx := context.Background() - mockPDClock := pdutil.NewClock4Test() - appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) - uri := fmt.Sprintf("file:///%s?protocol=csv&flush-interval=%ds", parentDir, flushInterval) +func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { + parentDir := t.TempDir() + + uri := fmt.Sprintf("file:///%s?protocol=csv&flush-interval=%ds", parentDir, 2) sinkURI, err := url.Parse(uri) - if err != nil { - return nil, err - } + require.NoError(t, err) + replicaConfig := config.GetDefaultReplicaConfig() err = replicaConfig.ValidateAndAdjust(sinkURI) - if err != nil { - return nil, err - } - changefeedID := common.NewChangefeedID4Test("test", "test") + require.NoError(t, err) - cfg := cloudstorage.NewConfig() - err = cfg.Apply(ctx, sinkURI, sinkConfig) - if err != nil { - return nil, err - } - protocol, err := helper.GetProtocol( - putil.GetOrZero(sinkConfig.Protocol), - ) - if err != nil { - return nil, errors.Trace(err) - } - // get cloud storage file extension according to the specific protocol. - ext := helper.GetFileExtension(protocol) - // the last param maxMsgBytes is mainly to limit the size of a single message for - // batch protocols in mq scenario. In cloud storage sink, we just set it to max int. - encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) - if err != nil { - return nil, errors.Trace(err) - } - storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) - if err != nil { - return nil, err - } - sink, err := NewCloudStorageDMLWorker(changefeedID, storage, cfg, encoderConfig, ext, metrics.NewStatistics(changefeedID, "CloudStorageSink")) - if err != nil { - return nil, err - } - go sink.Run(ctx) - return sink, nil -} + replicaConfig.Sink.DateSeparator = putil.AddressOf(config.DateSeparatorNone.String()) + replicaConfig.Sink.FileIndexWidth = putil.AddressOf(6) -func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { - parentDir := t.TempDir() - csvProtocol := "csv" - sinkConfig := &config.SinkConfig{Protocol: &csvProtocol, DateSeparator: putil.AddressOf(config.DateSeparatorNone.String()), FileIndexWidth: putil.AddressOf(6)} - s, err := newCloudStorageDMLWorkerForTest(parentDir, 2, sinkConfig) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + var cnt uint64 = 0 batch := 100 var tableInfoVersion uint64 = 33 @@ -136,23 +96,25 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { atomic.AddUint64(&cnt, uint64(len(dmls))) }) event.TableInfoVersion = tableInfoVersion - s.AddDMLEvent(event) + + cloudStorageSink.AddDMLEvent(event) time.Sleep(3 * time.Second) metaDir := path.Join(parentDir, "test/table1/meta") files, err := os.ReadDir(metaDir) - require.Nil(t, err) + require.NoError(t, err) require.Len(t, files, 1) tableDir := path.Join(parentDir, fmt.Sprintf("%s/%s/%d", job.SchemaName, job.TableName, tableInfoVersion)) fileNames := getTableFiles(t, tableDir) require.Len(t, fileNames, 2) require.ElementsMatch(t, []string{"CDC000001.csv", "CDC.index"}, fileNames) + content, err := os.ReadFile(path.Join(tableDir, "CDC000001.csv")) - require.Nil(t, err) + require.NoError(t, err) require.Greater(t, len(content), 0) content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(100), atomic.LoadUint64(&cnt)) @@ -161,7 +123,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { event.AddPostFlushFunc(func() { atomic.AddUint64(&cnt, uint64(len(dmls))) }) - s.AddDMLEvent(event) + + cloudStorageSink.AddDMLEvent(event) time.Sleep(3 * time.Second) fileNames = getTableFiles(t, tableDir) @@ -178,15 +141,36 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(200), atomic.LoadUint64(&cnt)) - s.Close() + cloudStorageSink.Close(false) } func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { parentDir := t.TempDir() - csvProtocol := "csv" - sinkConfig := &config.SinkConfig{Protocol: &csvProtocol, DateSeparator: putil.AddressOf(config.DateSeparatorDay.String()), FileIndexWidth: putil.AddressOf(6)} - s, err := newCloudStorageDMLWorkerForTest(parentDir, 4, sinkConfig) - require.Nil(t, err) + + uri := fmt.Sprintf("file:///%s?protocol=csv&flush-interval=%ds", parentDir, 4) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + replicaConfig.Sink.DateSeparator = putil.AddressOf(config.DateSeparatorDay.String()) + replicaConfig.Sink.FileIndexWidth = putil.AddressOf(6) + + mockClock := pclock.NewMock() + mockClock.Set(time.Date(2023, 3, 8, 23, 59, 58, 0, time.UTC)) + clock := pdutil.NewMonotonicClock(mockClock) + appcontext.SetService(appcontext.DefaultPDClock, clock) + + ctx, cancel := context.WithCancel(context.Background()) + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go func() { + err = cloudStorageSink.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + }() var cnt uint64 = 0 batch := 100 @@ -204,15 +188,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { dmls = append(dmls, fmt.Sprintf("insert into table1 values (%d, 'hello world')", j)) } - mockClock := pclock.NewMock() - mockClock.Set(time.Date(2023, 3, 8, 23, 59, 58, 0, time.UTC)) - setClock(s, mockClock) event := helper.DML2Event(job.SchemaName, job.TableName, dmls...) event.AddPostFlushFunc(func() { atomic.AddUint64(&cnt, uint64(len(dmls))) }) event.TableInfoVersion = tableInfoVersion - s.AddDMLEvent(event) + cloudStorageSink.AddDMLEvent(event) time.Sleep(5 * time.Second) tableDir := path.Join(parentDir, fmt.Sprintf("%s/%s/%d/2023-03-08", job.SchemaName, job.TableName, tableInfoVersion)) @@ -228,36 +209,65 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(100), atomic.LoadUint64(&cnt)) + cancel() + time.Sleep(5 * time.Second) + // test date (day) is NOT changed. mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC)) - setClock(s, mockClock) + clock = pdutil.NewMonotonicClock(mockClock) + + appcontext.SetService(appcontext.DefaultPDClock, clock) + + ctx, cancel = context.WithCancel(context.Background()) + cloudStorageSink, err = newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go func() { + err = cloudStorageSink.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + }() + event = helper.DML2Event(job.SchemaName, job.TableName, dmls...) event.AddPostFlushFunc(func() { atomic.AddUint64(&cnt, uint64(len(dmls))) }) event.TableInfoVersion = tableInfoVersion - s.AddDMLEvent(event) + cloudStorageSink.AddDMLEvent(event) time.Sleep(5 * time.Second) fileNames = getTableFiles(t, tableDir) require.Len(t, fileNames, 3) require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames) content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv")) - require.Nil(t, err) + require.NoError(t, err) require.Greater(t, len(content), 0) content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(200), atomic.LoadUint64(&cnt)) + cancel() + + time.Sleep(5 * time.Second) // test date (day) is changed. mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC)) - setClock(s, mockClock) + clock = pdutil.NewMonotonicClock(mockClock) + + appcontext.SetService(appcontext.DefaultPDClock, clock) - failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/worker/writer/passTickerOnce", "1*return") + ctx, cancel = context.WithCancel(context.Background()) + cloudStorageSink, err = newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + failpoint.Enable("github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/passTickerOnce", "1*return") defer func() { - _ = failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/worker/writer/passTickerOnce") + _ = failpoint.Disable("github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage/passTickerOnce") + }() + + go func() { + err = cloudStorageSink.Run(ctx) + require.ErrorIs(t, err, context.Canceled) }() event = helper.DML2Event(job.SchemaName, job.TableName, dmls...) @@ -265,8 +275,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { atomic.AddUint64(&cnt, uint64(len(dmls))) }) event.TableInfoVersion = tableInfoVersion - s.AddDMLEvent(event) - time.Sleep(10 * time.Second) + cloudStorageSink.AddDMLEvent(event) + time.Sleep(5 * time.Second) tableDir = path.Join(parentDir, "test/table1/33/2023-03-09") fileNames = getTableFiles(t, tableDir) @@ -280,36 +290,45 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) require.Equal(t, "CDC000001.csv\n", string(content)) require.Equal(t, uint64(300), atomic.LoadUint64(&cnt)) - s.Close() + cloudStorageSink.Close(false) + + cancel() + time.Sleep(5 * time.Second) // test table is scheduled from one node to another cnt = 0 - s, err = newCloudStorageDMLWorkerForTest(parentDir, 4, sinkConfig) - require.NoError(t, err) - mockClock = pclock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - setClock(s, mockClock) + appcontext.SetService(appcontext.DefaultPDClock, clock) + + ctx, cancel = context.WithCancel(context.Background()) + cloudStorageSink, err = newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go func() { + err = cloudStorageSink.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + }() event = helper.DML2Event(job.SchemaName, job.TableName, dmls...) event.AddPostFlushFunc(func() { atomic.AddUint64(&cnt, uint64(len(dmls))) }) event.TableInfoVersion = tableInfoVersion - s.AddDMLEvent(event) + cloudStorageSink.AddDMLEvent(event) time.Sleep(5 * time.Second) fileNames = getTableFiles(t, tableDir) require.Len(t, fileNames, 3) require.ElementsMatch(t, []string{"CDC000001.csv", "CDC000002.csv", "CDC.index"}, fileNames) content, err = os.ReadFile(path.Join(tableDir, "CDC000002.csv")) - require.Nil(t, err) + require.NoError(t, err) require.Greater(t, len(content), 0) content, err = os.ReadFile(path.Join(tableDir, "meta/CDC.index")) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, "CDC000002.csv\n", string(content)) require.Equal(t, uint64(100), atomic.LoadUint64(&cnt)) - s.Close() + cancel() } diff --git a/downstreamadapter/sink/cloudstorage/encoding_group.go b/downstreamadapter/sink/cloudstorage/encoding_group.go new file mode 100644 index 000000000..f443931c3 --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/encoding_group.go @@ -0,0 +1,103 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "context" + + commonType "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/sink/codec" + "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" +) + +const ( + defaultEncodingConcurrency = 8 + defaultChannelSize = 1024 +) + +type encodingGroup struct { + changeFeedID commonType.ChangeFeedID + codecConfig *common.Config + + concurrency int + + inputCh <-chan eventFragment + outputCh chan<- eventFragment + + closed *atomic.Bool +} + +func newEncodingGroup( + changefeedID commonType.ChangeFeedID, + codecConfig *common.Config, + concurrency int, + inputCh <-chan eventFragment, + outputCh chan<- eventFragment, +) *encodingGroup { + return &encodingGroup{ + changeFeedID: changefeedID, + codecConfig: codecConfig, + concurrency: concurrency, + inputCh: inputCh, + outputCh: outputCh, + + closed: atomic.NewBool(false), + } +} + +func (eg *encodingGroup) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < eg.concurrency; i++ { + g.Go(func() error { + return eg.runEncoder(ctx) + }) + } + return g.Wait() +} + +func (eg *encodingGroup) runEncoder(ctx context.Context) error { + encoder, err := codec.NewTxnEventEncoder(eg.codecConfig) + if err != nil { + return err + } + defer eg.closed.Store(true) + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case frag, ok := <-eg.inputCh: + if !ok || eg.closed.Load() { + return nil + } + err = encoder.AppendTxnEvent(frag.event) + if err != nil { + return err + } + frag.encodedMsgs = encoder.Build() + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case eg.outputCh <- frag: + } + } + } +} + +func (eg *encodingGroup) close() { + eg.closed.Store(true) +} diff --git a/downstreamadapter/sink/cloudstorage/sink.go b/downstreamadapter/sink/cloudstorage/sink.go new file mode 100644 index 000000000..f244b3e7e --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/sink.go @@ -0,0 +1,421 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "context" + "encoding/json" + "math" + "net/url" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/sink/helper" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/sink/cloudstorage" + "github.com/pingcap/ticdc/pkg/sink/util" + putil "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/robfig/cron" + "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// It will send the events to cloud storage systems. +// Messages are encoded in the specific protocol and then sent to the defragmenter. +// The data flow is as follows: **data** -> encodingWorkers -> defragmenter -> dmlWorkers -> external storage +// The defragmenter will defragment the out-of-order encoded messages and sends encoded +// messages to individual dmlWorkers. +// The dmlWorkers will write the encoded messages to external storage in parallel between different tables. +type sink struct { + changefeedID common.ChangeFeedID + cfg *cloudstorage.Config + sinkURI *url.URL + // todo: this field is not take effects yet, should be fixed. + outputRawChangeEvent bool + storage storage.ExternalStorage + + dmlWriters *dmlWriters + + checkpointChan chan uint64 + lastCheckpointTs atomic.Uint64 + lastSendCheckpointTsTime time.Time + + tableSchemaStore *util.TableSchemaStore + cron *cron.Cron + statistics *metrics.Statistics + + isNormal *atomic.Bool + cleanupJobs []func() /* only for test */ +} + +func Verify(ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig) error { + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, sinkConfig) + if err != nil { + return err + } + protocol, err := helper.GetProtocol(putil.GetOrZero(sinkConfig.Protocol)) + if err != nil { + return err + } + _, err = util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) + if err != nil { + return err + } + storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) + if err != nil { + return err + } + storage.Close() + return nil +} + +func New( + ctx context.Context, changefeedID common.ChangeFeedID, sinkURI *url.URL, sinkConfig *config.SinkConfig, + cleanupJobs []func(), /* only for test */ +) (*sink, error) { + // create cloud storage config and then apply the params of sinkURI to it. + cfg := cloudstorage.NewConfig() + err := cfg.Apply(ctx, sinkURI, sinkConfig) + if err != nil { + return nil, err + } + // fetch protocol from replicaConfig defined by changefeed config file. + protocol, err := helper.GetProtocol( + putil.GetOrZero(sinkConfig.Protocol), + ) + if err != nil { + return nil, errors.Trace(err) + } + // get cloud storage file extension according to the specific protocol. + ext := helper.GetFileExtension(protocol) + // the last param maxMsgBytes is mainly to limit the size of a single message for + // batch protocols in mq scenario. In cloud storage sink, we just set it to max int. + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt) + if err != nil { + return nil, errors.Trace(err) + } + storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) + if err != nil { + return nil, err + } + statistics := metrics.NewStatistics(changefeedID, "cloudstorage") + return &sink{ + changefeedID: changefeedID, + sinkURI: sinkURI, + cfg: cfg, + cleanupJobs: cleanupJobs, + storage: storage, + dmlWriters: newDMLWriters(changefeedID, storage, cfg, encoderConfig, ext, statistics), + checkpointChan: make(chan uint64, 16), + lastSendCheckpointTsTime: time.Now(), + outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(), + statistics: statistics, + isNormal: atomic.NewBool(true), + }, nil +} + +func (s *sink) SinkType() common.SinkType { + return common.CloudStorageSinkType +} + +func (s *sink) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return s.dmlWriters.Run(ctx) + }) + + g.Go(func() error { + return s.sendCheckpointTs(ctx) + }) + + g.Go(func() error { + if err := s.initCron(ctx, s.sinkURI, s.cleanupJobs); err != nil { + return err + } + s.bgCleanup(ctx) + return nil + }) + return g.Wait() +} + +func (s *sink) IsNormal() bool { + return s.isNormal.Load() +} + +func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { + s.dmlWriters.AddDMLEvent(event) +} + +func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { + var err error + switch e := event.(type) { + case *commonEvent.DDLEvent: + err = s.writeDDLEvent(e) + default: + log.Panic("sink doesn't support this type of block event", + zap.String("namespace", s.changefeedID.Namespace()), + zap.String("changefeed", s.changefeedID.Name()), + zap.Any("eventType", event.GetType())) + } + if err != nil { + s.isNormal.Store(false) + return err + } + event.PostFlush() + return nil +} + +func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { + if event.TiDBOnly { + return nil + } + for _, e := range event.GetEvents() { + var def cloudstorage.TableDefinition + def.FromDDLEvent(e, s.cfg.OutputColumnID) + if err := s.writeFile(e, def); err != nil { + return err + } + } + if event.GetDDLType() == model.ActionExchangeTablePartition { + // For exchange partition, we need to write the schema of the source table. + var sourceTableDef cloudstorage.TableDefinition + sourceTableDef.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.MultipleTableInfos[1], event.GetCommitTs(), s.cfg.OutputColumnID) + if err := s.writeFile(event, sourceTableDef); err != nil { + return err + } + } + return nil +} + +func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error { + encodedDef, err := def.MarshalWithQuery() + if err != nil { + return errors.Trace(err) + } + + path, err := def.GenerateSchemaFilePath() + if err != nil { + return errors.Trace(err) + } + log.Debug("write ddl event to external storage", + zap.String("path", path), zap.Any("ddl", v)) + return s.statistics.RecordDDLExecution(func() error { + err = s.storage.WriteFile(context.Background(), path, encodedDef) + if err != nil { + return err + } + return nil + }) +} + +func (s *sink) AddCheckpointTs(ts uint64) { + s.checkpointChan <- ts +} + +func (s *sink) sendCheckpointTs(ctx context.Context) error { + checkpointTsMessageDuration := metrics.CheckpointTsMessageDuration.WithLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name()) + checkpointTsMessageCount := metrics.CheckpointTsMessageCount.WithLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name()) + defer func() { + metrics.CheckpointTsMessageDuration.DeleteLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name()) + metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Namespace(), s.changefeedID.Name()) + }() + + var ( + checkpoint uint64 + ok bool + ) + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case checkpoint, ok = <-s.checkpointChan: + if !ok { + log.Warn("cloud storage sink checkpoint channel closed", + zap.String("namespace", s.changefeedID.Namespace()), + zap.String("changefeed", s.changefeedID.Name())) + return nil + } + } + + if time.Since(s.lastSendCheckpointTsTime) < 2*time.Second { + log.Warn("skip write checkpoint ts to external storage", + zap.Any("changefeedID", s.changefeedID), + zap.Uint64("checkpoint", checkpoint)) + continue + } + + start := time.Now() + message, err := json.Marshal(map[string]uint64{"checkpoint-ts": checkpoint}) + if err != nil { + log.Panic("CloudStorageSink marshal checkpoint failed, this should never happen", + zap.String("namespace", s.changefeedID.Namespace()), + zap.String("changefeed", s.changefeedID.Name()), + zap.Uint64("checkpoint", checkpoint), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + } + err = s.storage.WriteFile(ctx, "metadata", message) + if err != nil { + log.Error("CloudStorageSink storage write file failed", + zap.String("namespace", s.changefeedID.Namespace()), + zap.String("changefeed", s.changefeedID.Name()), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return errors.Trace(err) + } + s.lastSendCheckpointTsTime = time.Now() + s.lastCheckpointTs.Store(checkpoint) + + checkpointTsMessageCount.Inc() + checkpointTsMessageDuration.Observe(time.Since(start).Seconds()) + } +} + +func (s *sink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { + s.tableSchemaStore = tableSchemaStore +} + +func (s *sink) GetStartTsList(_ []int64, startTsList []int64, _ bool) ([]int64, []bool, error) { + return startTsList, make([]bool, len(startTsList)), nil +} + +func (s *sink) initCron( + ctx context.Context, sinkURI *url.URL, cleanupJobs []func(), +) (err error) { + if cleanupJobs == nil { + cleanupJobs = s.genCleanupJob(ctx, sinkURI) + } + + s.cron = cron.New() + for _, job := range cleanupJobs { + err = s.cron.AddFunc(s.cfg.FileCleanupCronSpec, job) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +func (s *sink) bgCleanup(ctx context.Context) { + if s.cfg.DateSeparator != config.DateSeparatorDay.String() || s.cfg.FileExpirationDays <= 0 { + log.Info("skip cleanup expired files for storage sink", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.String("dateSeparator", s.cfg.DateSeparator), + zap.Int("expiredFileTTL", s.cfg.FileExpirationDays)) + return + } + + s.cron.Start() + defer s.cron.Stop() + log.Info("start schedule cleanup expired files for storage sink", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.String("dateSeparator", s.cfg.DateSeparator), + zap.Int("expiredFileTTL", s.cfg.FileExpirationDays)) + + // wait for the context done + <-ctx.Done() + log.Info("stop schedule cleanup expired files for storage sink", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.Error(ctx.Err())) +} + +func (s *sink) genCleanupJob(ctx context.Context, uri *url.URL) []func() { + var ret []func() + + isLocal := uri.Scheme == "file" || uri.Scheme == "local" || uri.Scheme == "" + var isRemoveEmptyDirsRunning atomic.Bool + if isLocal { + ret = append(ret, func() { + if !isRemoveEmptyDirsRunning.CompareAndSwap(false, true) { + log.Warn("remove empty dirs is already running, skip this round", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID())) + return + } + + checkpointTs := s.lastCheckpointTs.Load() + start := time.Now() + cnt, err := cloudstorage.RemoveEmptyDirs(ctx, s.changefeedID, uri.Path) + if err != nil { + log.Error("failed to remove empty dirs", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove empty dirs", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + } + + var isCleanupRunning atomic.Bool + ret = append(ret, func() { + if !isCleanupRunning.CompareAndSwap(false, true) { + log.Warn("cleanup expired files is already running, skip this round", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID())) + return + } + + defer isCleanupRunning.Store(false) + start := time.Now() + checkpointTs := s.lastCheckpointTs.Load() + cnt, err := cloudstorage.RemoveExpiredFiles(ctx, s.changefeedID, s.storage, s.cfg, checkpointTs) + if err != nil { + log.Error("failed to remove expired files", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("cost", time.Since(start)), + zap.Error(err), + ) + return + } + log.Info("remove expired files", + zap.String("namespace", s.changefeedID.Namespace()), + zap.Stringer("changefeedID", s.changefeedID.ID()), + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("count", cnt), + zap.Duration("cost", time.Since(start))) + }) + return ret +} + +func (s *sink) Close(_ bool) { + s.dmlWriters.close() + s.cron.Stop() + if s.statistics != nil { + s.statistics.Close() + } + s.storage.Close() +} diff --git a/downstreamadapter/sink/cloudstorage/sink_test.go b/downstreamadapter/sink/cloudstorage/sink_test.go new file mode 100644 index 000000000..24c89e7b0 --- /dev/null +++ b/downstreamadapter/sink/cloudstorage/sink_test.go @@ -0,0 +1,273 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cloudstorage + +import ( + "context" + "fmt" + "net/url" + "os" + "path" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/util" + timodel "github.com/pingcap/tidb/pkg/meta/model" + pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" + "github.com/stretchr/testify/require" +) + +func newSinkForTest( + ctx context.Context, + replicaConfig *config.ReplicaConfig, + sinkURI *url.URL, + cleanUpJobs []func(), +) (*sink, error) { + changefeedID := common.NewChangefeedID4Test("test", "test") + result, err := New(ctx, changefeedID, sinkURI, replicaConfig.Sink, cleanUpJobs) + if err != nil { + return nil, err + } + return result, nil +} + +func TestBasicFunctionality(t *testing.T) { + uri := fmt.Sprintf("file:///%s?protocol=csv", t.TempDir()) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + + var count atomic.Int64 + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + createTableSQL := "create table t (id int primary key, name varchar(32));" + job := helper.DDL2Job(createTableSQL) + require.NotNil(t, job) + helper.ApplyJob(job) + + tableInfo := helper.GetTableInfo(job) + + ddlEvent := &commonEvent.DDLEvent{ + Query: job.Query, + SchemaName: job.SchemaName, + TableName: job.TableName, + FinishedTs: 1, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + TableInfo: tableInfo, + NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}}, + PostTxnFlushed: []func(){ + func() { count.Add(1) }, + }, + } + + ddlEvent2 := &commonEvent.DDLEvent{ + Query: job.Query, + SchemaName: job.SchemaName, + TableName: job.TableName, + FinishedTs: 4, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + TableInfo: tableInfo, + NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}}, + PostTxnFlushed: []func(){ + func() { count.Add(1) }, + }, + } + + dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')", "insert into t values (2, 'test2');") + dmlEvent.PostTxnFlushed = []func(){ + func() { + count.Add(1) + }, + } + dmlEvent.TableInfoVersion = 1 + + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) + + cloudStorageSink.AddDMLEvent(dmlEvent) + + time.Sleep(5 * time.Second) + + ddlEvent2.PostFlush() + + require.Equal(t, count.Load(), int64(3)) +} + +func TestWriteDDLEvent(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + + tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{ + ID: 20, + Name: pmodel.NewCIStr("table1"), + Columns: []*timodel.ColumnInfo{ + { + Name: pmodel.NewCIStr("col1"), + FieldType: *types.NewFieldType(mysql.TypeLong), + }, + { + Name: pmodel.NewCIStr("col2"), + FieldType: *types.NewFieldType(mysql.TypeVarchar), + }, + }, + }) + ddlEvent := &commonEvent.DDLEvent{ + Query: "alter table test.table1 add col2 varchar(64)", + Type: byte(timodel.ActionAddColumn), + SchemaName: "test", + TableName: "table1", + FinishedTs: 100, + TableInfo: tableInfo, + } + + tableDir := path.Join(parentDir, "test/table1/meta/") + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) + + tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) + require.NoError(t, err) + require.JSONEq(t, `{ + "Table": "table1", + "Schema": "test", + "Version": 1, + "TableVersion": 100, + "Query": "alter table test.table1 add col2 varchar(64)", + "Type": 5, + "TableColumns": [ + { + "ColumnName": "col1", + "ColumnType": "INT", + "ColumnPrecision": "11" + }, + { + "ColumnName": "col2", + "ColumnType": "VARCHAR", + "ColumnPrecision": "5" + } + ], + "TableColumnsTotal": 2 + }`, string(tableSchema)) +} + +func TestWriteCheckpointEvent(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + time.Sleep(3 * time.Second) + + cloudStorageSink.AddCheckpointTs(100) + + time.Sleep(2 * time.Second) + metadata, err := os.ReadFile(path.Join(parentDir, "metadata")) + require.NoError(t, err) + require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) +} + +func TestCleanupExpiredFiles(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ + FileExpirationDays: util.AddressOf(1), + FileCleanupCronSpec: util.AddressOf("* * * * * *"), + } + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + var count atomic.Int64 + cleanupJobs := []func(){ + func() { + count.Add(1) + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, cleanupJobs) + go cloudStorageSink.Run(ctx) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + require.LessOrEqual(t, int64(1), count.Load()) +} diff --git a/downstreamadapter/worker/writer/writer.go b/downstreamadapter/sink/cloudstorage/writer.go similarity index 93% rename from downstreamadapter/worker/writer/writer.go rename to downstreamadapter/sink/cloudstorage/writer.go index aa233860f..505779747 100644 --- a/downstreamadapter/worker/writer/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -10,7 +10,8 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package writer + +package cloudstorage import ( "bytes" @@ -36,8 +37,8 @@ import ( "golang.org/x/sync/errgroup" ) -// Writer denotes a worker responsible for writing messages to cloud storage. -type Writer struct { +// writer denotes a worker responsible for writing messages to cloud storage. +type writer struct { // worker id id int changeFeedID commonType.ChangeFeedID @@ -45,7 +46,7 @@ type Writer struct { config *cloudstorage.Config // toBeFlushedCh contains a set of batchedTask waiting to be flushed to cloud storage. toBeFlushedCh chan batchedTask - inputCh *chann.DrainableChann[EventFragment] + inputCh *chann.DrainableChann[eventFragment] isClosed uint64 statistics *metrics.Statistics filePathGenerator *cloudstorage.FilePathGenerator @@ -56,16 +57,16 @@ type Writer struct { metricsWorkerBusyRatio prometheus.Counter } -func NewWriter( +func newWriter( id int, changefeedID commonType.ChangeFeedID, storage storage.ExternalStorage, config *cloudstorage.Config, extension string, - inputCh *chann.DrainableChann[EventFragment], + inputCh *chann.DrainableChann[eventFragment], statistics *metrics.Statistics, -) *Writer { - d := &Writer{ +) *writer { + d := &writer{ id: id, changeFeedID: changefeedID, storage: storage, @@ -90,7 +91,7 @@ func NewWriter( } // Run creates a set of background goroutines. -func (d *Writer) Run(ctx context.Context) error { +func (d *writer) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return d.flushMessages(ctx) @@ -104,13 +105,13 @@ func (d *Writer) Run(ctx context.Context) error { } // SetClock is used for unit test -func (d *Writer) SetClock(pdClock pdutil.Clock) { +func (d *writer) SetClock(pdClock pdutil.Clock) { d.filePathGenerator.SetClock(pdClock) } // flushMessages flushed messages of active tables to cloud storage. // active tables are those tables that have received events after the last flush. -func (d *Writer) flushMessages(ctx context.Context) error { +func (d *writer) flushMessages(ctx context.Context) error { var flushTimeSlice time.Duration overseerDuration := d.config.FlushInterval * 2 overseerTicker := time.NewTicker(overseerDuration) @@ -196,14 +197,14 @@ func (d *Writer) flushMessages(ctx context.Context) error { } } -func (d *Writer) writeIndexFile(ctx context.Context, path, content string) error { +func (d *writer) writeIndexFile(ctx context.Context, path, content string) error { start := time.Now() err := d.storage.WriteFile(ctx, path, []byte(content)) d.metricFlushDuration.Observe(time.Since(start).Seconds()) return err } -func (d *Writer) writeDataFile(ctx context.Context, path string, task *singleTableTask) error { +func (d *writer) writeDataFile(ctx context.Context, path string, task *singleTableTask) error { var callbacks []func() buf := bytes.NewBuffer(make([]byte, 0, task.size)) rowsCnt := 0 @@ -265,8 +266,8 @@ func (d *Writer) writeDataFile(ctx context.Context, path string, task *singleTab // genAndDispatchTask dispatches flush tasks in two conditions: // 1. the flush interval exceeds the upper limit. // 2. the file size exceeds the upper limit. -func (d *Writer) genAndDispatchTask(ctx context.Context, - ch *chann.DrainableChann[EventFragment], +func (d *writer) genAndDispatchTask(ctx context.Context, + ch *chann.DrainableChann[eventFragment], ) error { batchedTask := newBatchedTask() ticker := time.NewTicker(d.config.FlushInterval) @@ -316,7 +317,7 @@ func (d *Writer) genAndDispatchTask(ctx context.Context, } } -func (d *Writer) Close() { +func (d *writer) close() { if !atomic.CompareAndSwapUint64(&d.isClosed, 0, 1) { return } @@ -341,7 +342,7 @@ func newBatchedTask() batchedTask { } } -func (t *batchedTask) handleSingleTableEvent(event EventFragment) { +func (t *batchedTask) handleSingleTableEvent(event eventFragment) { table := event.versionedTable if _, ok := t.batch[table]; !ok { t.batch[table] = &singleTableTask{ diff --git a/downstreamadapter/worker/writer/writer_test.go b/downstreamadapter/sink/cloudstorage/writer_test.go similarity index 87% rename from downstreamadapter/worker/writer/writer_test.go rename to downstreamadapter/sink/cloudstorage/writer_test.go index f47138674..aa382696e 100644 --- a/downstreamadapter/worker/writer/writer_test.go +++ b/downstreamadapter/sink/cloudstorage/writer_test.go @@ -10,13 +10,13 @@ // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. -package writer + +package cloudstorage import ( "context" "fmt" "net/url" - "os" "path" "sync" "testing" @@ -41,25 +41,7 @@ import ( "github.com/stretchr/testify/require" ) -func getTableFiles(t *testing.T, tableDir string) []string { - files, err := os.ReadDir(tableDir) - require.Nil(t, err) - - fileNames := []string{} - for _, f := range files { - fileName := f.Name() - if f.IsDir() { - metaFiles, err := os.ReadDir(path.Join(tableDir, f.Name())) - require.Nil(t, err) - require.Len(t, metaFiles, 1) - fileName = metaFiles[0].Name() - } - fileNames = append(fileNames, fileName) - } - return fileNames -} - -func testWriter(ctx context.Context, t *testing.T, dir string) *Writer { +func testWriter(ctx context.Context, t *testing.T, dir string) *writer { uri := fmt.Sprintf("file:///%s?flush-interval=2s", dir) storage, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri) require.Nil(t, err) @@ -78,8 +60,8 @@ func testWriter(ctx context.Context, t *testing.T, dir string) *Writer { appcontext.SetService(appcontext.DefaultPDClock, pdlock) mockPDClock := pdutil.NewClock4Test() appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) - d := NewWriter(1, changefeedID, storage, - cfg, ".json", chann.NewAutoDrainChann[EventFragment](), statistics) + d := newWriter(1, changefeedID, storage, + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), statistics) return d } @@ -103,7 +85,7 @@ func TestWriterRun(t *testing.T) { tableInfo := commonType.WrapTableInfo("test", tidbTableInfo) for i := 0; i < 5; i++ { - frag := EventFragment{ + frag := eventFragment{ seqNumber: uint64(i), versionedTable: cloudstorage.VersionedTableName{ TableNameWithPhysicTableID: commonType.TableName{ @@ -142,7 +124,7 @@ func TestWriterRun(t *testing.T) { require.Len(t, fileNames, 2) require.ElementsMatch(t, []string{"CDC000001.json", "CDC.index"}, fileNames) cancel() - d.Close() + d.close() wg.Wait() fragCh.CloseAndDrain() } diff --git a/downstreamadapter/sink/cloudstorage_test.go b/downstreamadapter/sink/cloudstorage_test.go deleted file mode 100644 index 1e3a82534..000000000 --- a/downstreamadapter/sink/cloudstorage_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "fmt" - "net/url" - "sync/atomic" - "testing" - "time" - - "github.com/pingcap/ticdc/pkg/common" - appcontext "github.com/pingcap/ticdc/pkg/common/context" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/pdutil" - "github.com/stretchr/testify/require" -) - -func newCloudStorageSinkForTest(parentDir string) (*CloudStorageSink, error) { - ctx := context.Background() - mockPDClock := pdutil.NewClock4Test() - appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) - changefeedID := common.NewChangefeedID4Test("test", "test") - csvProtocol := "csv" - sinkConfig := &config.SinkConfig{Protocol: &csvProtocol} - uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) - sinkURI, err := url.Parse(uri) - if err != nil { - return nil, err - } - sink, err := newCloudStorageSink(ctx, changefeedID, sinkURI, sinkConfig, nil) - if err != nil { - return nil, err - } - go sink.Run(ctx) - return sink, nil -} - -func TestCloudStorageSinkBasicFunctionality(t *testing.T) { - sink, err := newCloudStorageSinkForTest(t.TempDir()) - require.NoError(t, err) - - var count atomic.Int64 - - helper := commonEvent.NewEventTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("use test") - createTableSQL := "create table t (id int primary key, name varchar(32));" - job := helper.DDL2Job(createTableSQL) - require.NotNil(t, job) - helper.ApplyJob(job) - - tableInfo := helper.GetTableInfo(job) - - ddlEvent := &commonEvent.DDLEvent{ - Query: job.Query, - SchemaName: job.SchemaName, - TableName: job.TableName, - FinishedTs: 1, - BlockedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{0}, - }, - TableInfo: tableInfo, - NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}}, - PostTxnFlushed: []func(){ - func() { count.Add(1) }, - }, - } - - ddlEvent2 := &commonEvent.DDLEvent{ - Query: job.Query, - SchemaName: job.SchemaName, - TableName: job.TableName, - FinishedTs: 4, - BlockedTables: &commonEvent.InfluencedTables{ - InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{0}, - }, - TableInfo: tableInfo, - NeedAddedTables: []commonEvent.Table{{TableID: 1, SchemaID: 1}}, - PostTxnFlushed: []func(){ - func() { count.Add(1) }, - }, - } - - dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')", "insert into t values (2, 'test2');") - dmlEvent.PostTxnFlushed = []func(){ - func() { - count.Add(1) - }, - } - dmlEvent.TableInfoVersion = 1 - - err = sink.WriteBlockEvent(ddlEvent) - require.NoError(t, err) - - sink.AddDMLEvent(dmlEvent) - - time.Sleep(5 * time.Second) - - ddlEvent2.PostFlush() - - require.Equal(t, count.Load(), int64(3)) -} diff --git a/downstreamadapter/sink/helper/helper.go b/downstreamadapter/sink/helper/helper.go index 3d33a6826..89b307c6f 100644 --- a/downstreamadapter/sink/helper/helper.go +++ b/downstreamadapter/sink/helper/helper.go @@ -79,6 +79,73 @@ func GetFileExtension(protocol config.Protocol) string { } } +const ( + // KafkaScheme indicates the scheme is kafka. + KafkaScheme = "kafka" + // KafkaSSLScheme indicates the scheme is kafka+ssl. + KafkaSSLScheme = "kafka+ssl" + // BlackHoleScheme indicates the scheme is blackhole. + BlackHoleScheme = "blackhole" + // MySQLScheme indicates the scheme is MySQL. + MySQLScheme = "mysql" + // MySQLSSLScheme indicates the scheme is MySQL+ssl. + MySQLSSLScheme = "mysql+ssl" + // TiDBScheme indicates the scheme is TiDB. + TiDBScheme = "tidb" + // TiDBSSLScheme indicates the scheme is TiDB+ssl. + TiDBSSLScheme = "tidb+ssl" + // S3Scheme indicates the scheme is s3. + S3Scheme = "s3" + // FileScheme indicates the scheme is local fs or NFS. + FileScheme = "file" + // GCSScheme indicates the scheme is gcs. + GCSScheme = "gcs" + // GSScheme is an alias for "gcs" + GSScheme = "gs" + // AzblobScheme indicates the scheme is azure blob storage.\ + AzblobScheme = "azblob" + // AzureScheme is an alias for "azblob" + AzureScheme = "azure" + // CloudStorageNoopScheme indicates the scheme is noop. + CloudStorageNoopScheme = "noop" + // PulsarScheme indicates the scheme is pulsar + PulsarScheme = "pulsar" + // PulsarSSLScheme indicates the scheme is pulsar+ssl + PulsarSSLScheme = "pulsar+ssl" + // PulsarHTTPScheme indicates the schema is pulsar with http protocol + PulsarHTTPScheme = "pulsar+http" + // PulsarHTTPSScheme indicates the schema is pulsar with https protocol + PulsarHTTPSScheme = "pulsar+https" +) + +// IsMQScheme returns true if the scheme belong to mq scheme. +func IsMQScheme(scheme string) bool { + return scheme == KafkaScheme || scheme == KafkaSSLScheme || + scheme == PulsarScheme || scheme == PulsarSSLScheme || scheme == PulsarHTTPScheme || scheme == PulsarHTTPSScheme +} + +// IsMySQLCompatibleScheme returns true if the scheme is compatible with MySQL. +func IsMySQLCompatibleScheme(scheme string) bool { + return scheme == MySQLScheme || scheme == MySQLSSLScheme || + scheme == TiDBScheme || scheme == TiDBSSLScheme +} + +// IsStorageScheme returns true if the scheme belong to storage scheme. +func IsStorageScheme(scheme string) bool { + return scheme == FileScheme || scheme == S3Scheme || scheme == GCSScheme || + scheme == GSScheme || scheme == AzblobScheme || scheme == AzureScheme || scheme == CloudStorageNoopScheme +} + +// IsPulsarScheme returns true if the scheme belong to pulsar scheme. +func IsPulsarScheme(scheme string) bool { + return scheme == PulsarScheme || scheme == PulsarSSLScheme || scheme == PulsarHTTPScheme || scheme == PulsarHTTPSScheme +} + +// IsBlackHoleScheme returns true if the scheme belong to blackhole scheme. +func IsBlackHoleScheme(scheme string) bool { + return scheme == BlackHoleScheme +} + // GetScheme returns the scheme of the url. func GetScheme(url *url.URL) string { return strings.ToLower(url.Scheme) diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index 3deb18eb6..76993ea7d 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -164,6 +164,7 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { s.isNormal.Store(false) return err } + event.PostFlush() return nil } @@ -418,8 +419,6 @@ func (s *sink) sendMessages(ctx context.Context) error { func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error { if event.TiDBOnly { - // run callback directly and return - event.PostFlush() return nil } for _, e := range event.GetEvents() { @@ -458,8 +457,6 @@ func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error { log.Info("kafka sink send DDL event", zap.String("namespace", s.changefeedID.Namespace()), zap.String("changefeed", s.changefeedID.Name()), zap.Any("commitTs", event.GetCommitTs()), zap.Any("event", event.GetDDLQuery())) - // after flush all the ddl event, we call the callback function. - event.PostFlush() return nil } diff --git a/downstreamadapter/sink/mysql/sink.go b/downstreamadapter/sink/mysql/sink.go index d23cc794b..ebe046fbe 100644 --- a/downstreamadapter/sink/mysql/sink.go +++ b/downstreamadapter/sink/mysql/sink.go @@ -228,7 +228,7 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { case commonEvent.TypeSyncPointEvent: err = s.ddlWriter.FlushSyncPointEvent(event.(*commonEvent.SyncPointEvent)) default: - log.Error("unknown event type", + log.Panic("mysql sink meet unknown event type", zap.String("namespace", s.changefeedID.Namespace()), zap.String("changefeed", s.changefeedID.Name()), zap.Any("event", event)) @@ -237,6 +237,7 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { s.isNormal.Store(false) return errors.Trace(err) } + event.PostFlush() return nil } diff --git a/downstreamadapter/sink/pulsar/sink.go b/downstreamadapter/sink/pulsar/sink.go index b7ea2acaf..32d3ae6a8 100644 --- a/downstreamadapter/sink/pulsar/sink.go +++ b/downstreamadapter/sink/pulsar/sink.go @@ -144,13 +144,12 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { s.isNormal.Store(false) return err } + event.PostFlush() return nil } func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error { if event.TiDBOnly { - // run callback directly and return - event.PostFlush() return nil } for _, e := range event.GetEvents() { @@ -183,8 +182,6 @@ func (s *sink) sendDDLEvent(event *commonEvent.DDLEvent) error { log.Info("pulsar sink send DDL event", zap.String("namespace", s.changefeedID.Namespace()), zap.String("changefeed", s.changefeedID.Name()), zap.Any("commitTs", event.GetCommitTs()), zap.Any("event", event.GetDDLQuery())) - // after flush all the ddl event, we call the callback function. - event.PostFlush() return nil } diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 3d57eed4a..0de7d508a 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -17,6 +17,9 @@ import ( "context" "net/url" + "github.com/pingcap/ticdc/downstreamadapter/sink/blackhole" + "github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage" + "github.com/pingcap/ticdc/downstreamadapter/sink/helper" "github.com/pingcap/ticdc/downstreamadapter/sink/kafka" "github.com/pingcap/ticdc/downstreamadapter/sink/mysql" "github.com/pingcap/ticdc/downstreamadapter/sink/pulsar" @@ -25,7 +28,6 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/util" - "github.com/pingcap/tiflow/pkg/sink" ) type Sink interface { @@ -47,18 +49,18 @@ func NewSink(ctx context.Context, config *config.ChangefeedConfig, changefeedID if err != nil { return nil, errors.WrapError(errors.ErrSinkURIInvalid, err) } - scheme := sink.GetScheme(sinkURI) + scheme := helper.GetScheme(sinkURI) switch scheme { - case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: + case helper.MySQLScheme, helper.MySQLSSLScheme, helper.TiDBScheme, helper.TiDBSSLScheme: return mysql.New(ctx, changefeedID, config, sinkURI) - case sink.KafkaScheme, sink.KafkaSSLScheme: + case helper.KafkaScheme, helper.KafkaSSLScheme: return kafka.New(ctx, changefeedID, sinkURI, config.SinkConfig) - case sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme: + case helper.PulsarScheme, helper.PulsarSSLScheme, helper.PulsarHTTPScheme, helper.PulsarHTTPSScheme: return pulsar.New(ctx, changefeedID, sinkURI, config.SinkConfig) - case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: - return newCloudStorageSink(ctx, changefeedID, sinkURI, config.SinkConfig, nil) - case sink.BlackHoleScheme: - return newBlackHoleSink() + case helper.S3Scheme, helper.FileScheme, helper.GCSScheme, helper.GSScheme, helper.AzblobScheme, helper.AzureScheme, helper.CloudStorageNoopScheme: + return cloudstorage.New(ctx, changefeedID, sinkURI, config.SinkConfig, nil) + case helper.BlackHoleScheme: + return blackhole.New() } return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) } @@ -68,17 +70,17 @@ func VerifySink(ctx context.Context, config *config.ChangefeedConfig, changefeed if err != nil { return errors.WrapError(errors.ErrSinkURIInvalid, err) } - scheme := sink.GetScheme(sinkURI) + scheme := helper.GetScheme(sinkURI) switch scheme { - case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: + case helper.MySQLScheme, helper.MySQLSSLScheme, helper.TiDBScheme, helper.TiDBSSLScheme: return mysql.Verify(ctx, sinkURI, config) - case sink.KafkaScheme, sink.KafkaSSLScheme: + case helper.KafkaScheme, helper.KafkaSSLScheme: return kafka.Verify(ctx, changefeedID, sinkURI, config.SinkConfig) - case sink.PulsarScheme, sink.PulsarSSLScheme, sink.PulsarHTTPScheme, sink.PulsarHTTPSScheme: + case helper.PulsarScheme, helper.PulsarSSLScheme, helper.PulsarHTTPScheme, helper.PulsarHTTPSScheme: return pulsar.Verify(ctx, changefeedID, sinkURI, config.SinkConfig) - case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: - return verifyCloudStorageSink(ctx, changefeedID, sinkURI, config.SinkConfig) - case sink.BlackHoleScheme: + case helper.S3Scheme, helper.FileScheme, helper.GCSScheme, helper.GSScheme, helper.AzblobScheme, helper.AzureScheme, helper.CloudStorageNoopScheme: + return cloudstorage.Verify(ctx, changefeedID, sinkURI, config.SinkConfig) + case helper.BlackHoleScheme: return nil } return errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) diff --git a/downstreamadapter/worker/cloudstorage_ddl_worker_test.go b/downstreamadapter/worker/cloudstorage_ddl_worker_test.go deleted file mode 100644 index fc9a57bc3..000000000 --- a/downstreamadapter/worker/cloudstorage_ddl_worker_test.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package worker - -import ( - "context" - "fmt" - "net/url" - "os" - "path" - "sync/atomic" - "testing" - "time" - - "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" - "github.com/pingcap/ticdc/pkg/util" - timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/types" - "github.com/stretchr/testify/require" -) - -func newCloudStorageDDLWorkerForTest(parentDir string) (*CloudStorageDDLWorker, error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) - sinkURI, err := url.Parse(uri) - if err != nil { - return nil, err - } - replicaConfig := config.GetDefaultReplicaConfig() - err = replicaConfig.ValidateAndAdjust(sinkURI) - if err != nil { - return nil, err - } - changefeedID := common.NewChangefeedID4Test("test", "test") - - csvProtocol := "csv" - sinkConfig := &config.SinkConfig{Protocol: &csvProtocol} - cfg := cloudstorage.NewConfig() - err = cfg.Apply(ctx, sinkURI, sinkConfig) - if err != nil { - return nil, err - } - storage, err := util.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) - if err != nil { - return nil, err - } - sink := NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, nil, storage, metrics.NewStatistics(changefeedID, "CloudStorageSink")) - go sink.Run(ctx) - return sink, nil -} - -func TestCloudStorageWriteDDLEvent(t *testing.T) { - parentDir := t.TempDir() - sink, err := newCloudStorageDDLWorkerForTest(parentDir) - require.NoError(t, err) - - tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{ - ID: 20, - Name: pmodel.NewCIStr("table1"), - Columns: []*timodel.ColumnInfo{ - { - Name: pmodel.NewCIStr("col1"), - FieldType: *types.NewFieldType(mysql.TypeLong), - }, - { - Name: pmodel.NewCIStr("col2"), - FieldType: *types.NewFieldType(mysql.TypeVarchar), - }, - }, - }) - ddlEvent := &commonEvent.DDLEvent{ - Query: "alter table test.table1 add col2 varchar(64)", - Type: byte(timodel.ActionAddColumn), - SchemaName: "test", - TableName: "table1", - FinishedTs: 100, - TableInfo: tableInfo, - } - - tableDir := path.Join(parentDir, "test/table1/meta/") - err = sink.WriteBlockEvent(ddlEvent) - require.Nil(t, err) - - tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) - require.Nil(t, err) - require.JSONEq(t, `{ - "Table": "table1", - "Schema": "test", - "Version": 1, - "TableVersion": 100, - "Query": "alter table test.table1 add col2 varchar(64)", - "Type": 5, - "TableColumns": [ - { - "ColumnName": "col1", - "ColumnType": "INT", - "ColumnPrecision": "11" - }, - { - "ColumnName": "col2", - "ColumnType": "VARCHAR", - "ColumnPrecision": "5" - } - ], - "TableColumnsTotal": 2 - }`, string(tableSchema)) -} - -func TestCloudStorageWriteCheckpointTs(t *testing.T) { - parentDir := t.TempDir() - sink, err := newCloudStorageDDLWorkerForTest(parentDir) - require.NoError(t, err) - - time.Sleep(3 * time.Second) - sink.AddCheckpointTs(100) - metadata, err := os.ReadFile(path.Join(parentDir, "metadata")) - require.Nil(t, err) - require.JSONEq(t, `{"checkpoint-ts":100}`, string(metadata)) -} - -func TestCleanupExpiredFiles(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - parentDir := t.TempDir() - uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) - sinkURI, err := url.Parse(uri) - require.Nil(t, err) - replicaConfig := config.GetDefaultReplicaConfig() - replicaConfig.Sink.CloudStorageConfig = &config.CloudStorageConfig{ - FileExpirationDays: util.AddressOf(1), - FileCleanupCronSpec: util.AddressOf("* * * * * *"), - } - err = replicaConfig.ValidateAndAdjust(sinkURI) - require.Nil(t, err) - - cnt := atomic.Int64{} - cleanupJobs := []func(){ - func() { - cnt.Add(1) - }, - } - changefeedID := common.NewChangefeedID4Test("test", "test") - cfg := cloudstorage.NewConfig() - err = cfg.Apply(ctx, sinkURI, replicaConfig.Sink) - require.Nil(t, err) - storage, err := util.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String()) - require.Nil(t, err) - - sink := NewCloudStorageDDLWorker(changefeedID, sinkURI, cfg, cleanupJobs, storage, metrics.NewStatistics(changefeedID, "CloudStorageSink")) - go sink.Run(ctx) - require.Nil(t, err) - - _ = sink - time.Sleep(5 * time.Second) - require.LessOrEqual(t, int64(1), cnt.Load()) -} diff --git a/downstreamadapter/worker/cloudstorage_dml_worker.go b/downstreamadapter/worker/cloudstorage_dml_worker.go deleted file mode 100644 index 16cad12c6..000000000 --- a/downstreamadapter/worker/cloudstorage_dml_worker.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -package worker - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/pingcap/log" - "github.com/pingcap/ticdc/downstreamadapter/worker/writer" - commonType "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/metrics" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" - "github.com/pingcap/ticdc/pkg/sink/codec" - "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/ticdc/utils/chann" - "github.com/pingcap/tidb/br/pkg/storage" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - defaultEncodingConcurrency = 8 - defaultChannelSize = 1024 -) - -// CloudStorageDMLWorker denotes a worker responsible for writing messages to cloud storage. -type CloudStorageDMLWorker struct { - changefeedID commonType.ChangeFeedID - storage storage.ExternalStorage - config *cloudstorage.Config - statistics *metrics.Statistics - - // last sequence number - lastSeqNum uint64 - // workers defines a group of workers for encoding events. - workers []*writer.Worker - writers []*writer.Writer - // defragmenter is used to defragment the out-of-order encoded messages and - // sends encoded messages to individual dmlWorkers. - defragmenter *writer.Defragmenter - alive struct { - sync.RWMutex - // msgCh is a channel to hold eventFragment. - // The caller of WriteEvents will write eventFragment to msgCh and - // the encodingWorkers will read eventFragment from msgCh to encode events. - msgCh *chann.DrainableChann[writer.EventFragment] - isDead bool - } -} - -func NewCloudStorageDMLWorker( - changefeedID commonType.ChangeFeedID, - storage storage.ExternalStorage, - config *cloudstorage.Config, - encoderConfig *common.Config, - extension string, - statistics *metrics.Statistics, -) (*CloudStorageDMLWorker, error) { - w := &CloudStorageDMLWorker{ - changefeedID: changefeedID, - storage: storage, - config: config, - statistics: statistics, - workers: make([]*writer.Worker, defaultEncodingConcurrency), - writers: make([]*writer.Writer, config.WorkerCount), - } - w.alive.msgCh = chann.NewAutoDrainChann[writer.EventFragment]() - encodedOutCh := make(chan writer.EventFragment, defaultChannelSize) - workerChannels := make([]*chann.DrainableChann[writer.EventFragment], config.WorkerCount) - // create a group of encoding workers. - for i := 0; i < defaultEncodingConcurrency; i++ { - encoderBuilder, err := codec.NewTxnEventEncoder(encoderConfig) - if err != nil { - return nil, err - } - w.workers[i] = writer.NewWorker(i, w.changefeedID, encoderBuilder, w.alive.msgCh.Out(), encodedOutCh) - } - // create a group of dml workers. - for i := 0; i < w.config.WorkerCount; i++ { - inputCh := chann.NewAutoDrainChann[writer.EventFragment]() - w.writers[i] = writer.NewWriter(i, w.changefeedID, storage, config, extension, - inputCh, w.statistics) - workerChannels[i] = inputCh - } - // create defragmenter. - // The defragmenter is used to defragment the out-of-order encoded messages from encoding workers and - // sends encoded messages to related dmlWorkers in order. Messages of the same table will be sent to - // the same dml - w.defragmenter = writer.NewDefragmenter(encodedOutCh, workerChannels) - - return w, nil -} - -// run creates a set of background goroutines. -func (w *CloudStorageDMLWorker) Run(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - - for i := 0; i < len(w.workers); i++ { - encodingWorker := w.workers[i] - eg.Go(func() error { - return encodingWorker.Run(ctx) - }) - } - - eg.Go(func() error { - return w.defragmenter.Run(ctx) - }) - - for i := 0; i < len(w.writers); i++ { - worker := w.writers[i] - eg.Go(func() error { - return worker.Run(ctx) - }) - } - - return eg.Wait() -} - -func (w *CloudStorageDMLWorker) AddDMLEvent(event *commonEvent.DMLEvent) { - w.alive.RLock() - defer w.alive.RUnlock() - if w.alive.isDead { - log.Error("dead dmlSink", zap.Error(errors.Trace(errors.New("dead dmlSink")))) - return - } - - if event.State != commonEvent.EventSenderStateNormal { - // The table where the event comes from is in stopping, so it's safe - // to drop the event directly. - event.PostFlush() - return - } - - tbl := cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: commonType.TableName{ - Schema: event.TableInfo.GetSchemaName(), - Table: event.TableInfo.GetTableName(), - TableID: event.PhysicalTableID, - IsPartition: event.TableInfo.IsPartitionTable(), - }, - TableInfoVersion: event.TableInfoVersion, - } - seq := atomic.AddUint64(&w.lastSeqNum, 1) - - w.statistics.RecordBatchExecution(func() (int, int64, error) { - // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. - w.alive.msgCh.In() <- writer.NewEventFragment(seq, tbl, event) - return int(event.Len()), event.GetRowsSize(), nil - }) -} - -func (w *CloudStorageDMLWorker) Close() { - w.alive.Lock() - w.alive.isDead = true - w.alive.msgCh.CloseAndDrain() - w.alive.Unlock() - - for _, worker := range w.workers { - worker.Close() - } - - for _, writer := range w.writers { - writer.Close() - } -} diff --git a/downstreamadapter/worker/writer/worker.go b/downstreamadapter/worker/writer/worker.go deleted file mode 100644 index 17841545f..000000000 --- a/downstreamadapter/worker/writer/worker.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -import ( - "context" - "sync/atomic" - - commonType "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/errors" -) - -// Worker denotes the worker responsible for encoding RowChangedEvents -// to messages formatted in the specific protocol. -type Worker struct { - id int - changeFeedID commonType.ChangeFeedID - encoder common.TxnEventEncoder - isClosed uint64 - inputCh <-chan EventFragment - outputCh chan<- EventFragment -} - -func NewWorker( - workerID int, - changefeedID commonType.ChangeFeedID, - encoder common.TxnEventEncoder, - inputCh <-chan EventFragment, - outputCh chan<- EventFragment, -) *Worker { - return &Worker{ - id: workerID, - changeFeedID: changefeedID, - encoder: encoder, - inputCh: inputCh, - outputCh: outputCh, - } -} - -func (w *Worker) Run(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case frag, ok := <-w.inputCh: - if !ok || atomic.LoadUint64(&w.isClosed) == 1 { - return nil - } - err := w.encodeEvents(frag) - if err != nil { - return errors.Trace(err) - } - } - } -} - -func (w *Worker) encodeEvents(frag EventFragment) error { - w.encoder.AppendTxnEvent(frag.event) - frag.encodedMsgs = w.encoder.Build() - w.outputCh <- frag - - return nil -} - -func (w *Worker) Close() { - if !atomic.CompareAndSwapUint64(&w.isClosed, 0, 1) { - return - } -} diff --git a/downstreamadapter/worker/writer/worker_test.go b/downstreamadapter/worker/writer/worker_test.go deleted file mode 100644 index d7c0e3f3e..000000000 --- a/downstreamadapter/worker/writer/worker_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -package writer - -import ( - "context" - "fmt" - "net/url" - "sync" - "testing" - - "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/sink/cloudstorage" - "github.com/pingcap/ticdc/pkg/sink/codec" - "github.com/pingcap/ticdc/pkg/sink/util" - "github.com/pingcap/ticdc/utils/chann" - timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/types" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -func testWorker( - t *testing.T, -) (*Worker, chan EventFragment, chan EventFragment) { - uri := fmt.Sprintf("file:///%s", t.TempDir()) - sinkURI, err := url.Parse(uri) - require.Nil(t, err) - - replicaConfig := config.GetDefaultReplicaConfig() - changefeedID := common.NewChangefeedID4Test("test", "table1") - encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, config.ProtocolCsv, - replicaConfig.Sink, config.DefaultMaxMessageBytes) - require.Nil(t, err) - encoder, err := codec.NewTxnEventEncoder(encoderConfig) - require.Nil(t, err) - - encodedCh := make(chan EventFragment) - msgCh := make(chan EventFragment, 1024) - return NewWorker(1, changefeedID, encoder, msgCh, encodedCh), msgCh, encodedCh -} - -func TestEncodeEvents(t *testing.T) { - t.Parallel() - - encodingWorker, _, encodedCh := testWorker(t) - ctx, cancel := context.WithCancel(context.Background()) - eg, egCtx := errgroup.WithContext(ctx) - outputChs := []*chann.DrainableChann[EventFragment]{chann.NewAutoDrainChann[EventFragment]()} - defragmenter := NewDefragmenter(encodedCh, outputChs) - eg.Go(func() error { - return defragmenter.Run(egCtx) - }) - - tidbTableInfo := &timodel.TableInfo{ - ID: 100, - Name: pmodel.NewCIStr("table1"), - Columns: []*timodel.ColumnInfo{ - {ID: 1, Name: pmodel.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)}, - {ID: 2, Name: pmodel.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, - }, - } - tableInfo := common.WrapTableInfo("test", tidbTableInfo) - err := encodingWorker.encodeEvents(EventFragment{ - versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: common.TableName{ - Schema: "test", - Table: "table1", - TableID: 100, - }, - TableInfoVersion: 33, - }, - seqNumber: 1, - event: &commonEvent.DMLEvent{ - PhysicalTableID: 100, - TableInfo: tableInfo, - Rows: chunk.MutRowFromValues(100, "hello world", 200, "你好,世界").ToRow().Chunk(), - }, - }) - require.Nil(t, err) - cancel() - require.ErrorIs(t, eg.Wait(), context.Canceled) -} - -func TestEncodingWorkerRun(t *testing.T) { - t.Parallel() - - encodingWorker, msgCh, encodedCh := testWorker(t) - ctx, cancel := context.WithCancel(context.Background()) - eg, egCtx := errgroup.WithContext(ctx) - outputChs := []*chann.DrainableChann[EventFragment]{chann.NewAutoDrainChann[EventFragment]()} - defragmenter := NewDefragmenter(encodedCh, outputChs) - eg.Go(func() error { - return defragmenter.Run(egCtx) - }) - - tidbTableInfo := &timodel.TableInfo{ - ID: 100, - Name: pmodel.NewCIStr("table1"), - Columns: []*timodel.ColumnInfo{ - {ID: 1, Name: pmodel.NewCIStr("c1"), FieldType: *types.NewFieldType(mysql.TypeLong)}, - {ID: 2, Name: pmodel.NewCIStr("c2"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, - }, - } - tableInfo := common.WrapTableInfo("test", tidbTableInfo) - - for i := 0; i < 3; i++ { - frag := EventFragment{ - versionedTable: cloudstorage.VersionedTableName{ - TableNameWithPhysicTableID: common.TableName{ - Schema: "test", - Table: "table1", - TableID: 100, - }, - }, - seqNumber: uint64(i + 1), - event: &commonEvent.DMLEvent{ - PhysicalTableID: 100, - TableInfo: tableInfo, - Rows: chunk.MutRowFromValues(100, "hello world").ToRow().Chunk(), - }, - } - msgCh <- frag - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _ = encodingWorker.Run(ctx) - }() - - cancel() - encodingWorker.Close() - wg.Wait() -} diff --git a/pkg/sink/codec/canal/canal_json_txn_encoder.go b/pkg/sink/codec/canal/canal_json_txn_encoder.go index 5f5616de0..ea320d8d5 100644 --- a/pkg/sink/codec/canal/canal_json_txn_encoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_encoder.go @@ -47,8 +47,8 @@ type JSONTxnEventEncoder struct { columnSelector columnselector.Selector } -// NewJSONTxnEventEncoderBuilder creates a new JSONTxnEventEncoder -func NewJSONTxnEventEncoderBuilder(config *common.Config) common.TxnEventEncoder { +// NewJSONTxnEventEncoder creates a new JSONTxnEventEncoder +func NewJSONTxnEventEncoder(config *common.Config) common.TxnEventEncoder { return &JSONTxnEventEncoder{ valueBuf: &bytes.Buffer{}, terminator: []byte(config.Terminator), diff --git a/pkg/sink/codec/encoder_builder.go b/pkg/sink/codec/encoder_builder.go index 59ecdbfcd..882eeceaa 100644 --- a/pkg/sink/codec/encoder_builder.go +++ b/pkg/sink/codec/encoder_builder.go @@ -52,7 +52,7 @@ func NewTxnEventEncoder( case config.ProtocolCsv: return csv.NewTxnEventEncoder(c), nil case config.ProtocolCanalJSON: - return canal.NewJSONTxnEventEncoderBuilder(c), nil + return canal.NewJSONTxnEventEncoder(c), nil default: return nil, errors.ErrSinkUnknownProtocol.GenWithStackByArgs(c.Protocol) } diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index 34657829a..f9bb99e40 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -105,9 +105,6 @@ func (w *Writer) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) { func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error { if w.cfg.DryRun { - for _, callback := range event.PostTxnFlushed { - callback() - } return nil } @@ -147,18 +144,11 @@ func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error { return err } } - - for _, callback := range event.PostTxnFlushed { - callback() - } return nil } func (w *Writer) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error { if w.cfg.DryRun { - for _, callback := range event.PostTxnFlushed { - callback() - } return nil } @@ -192,10 +182,6 @@ func (w *Writer) FlushSyncPointEvent(event *commonEvent.SyncPointEvent) error { if err != nil { return err } - - for _, callback := range event.PostTxnFlushed { - callback() - } return nil }