Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: add debezium protocol #1103

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/avro"
"github.com/pingcap/tiflow/pkg/sink/codec/canal"
"github.com/pingcap/tiflow/pkg/sink/codec/debezium"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
"go.uber.org/zap"
Expand All @@ -59,6 +60,8 @@ func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (code
decoder = avro.NewDecoder(option.codecConfig, schemaM, option.topic, upstreamTiDB)
case config.ProtocolSimple:
decoder, err = simple.NewDecoder(ctx, option.codecConfig, upstreamTiDB)
case config.ProtocolDebezium:
decoder = debezium.NewDecoder(option.codecConfig, upstreamTiDB)
default:
log.Panic("Protocol not supported", zap.Any("Protocol", option.protocol))
}
Expand Down Expand Up @@ -471,7 +474,7 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partition
return
}
switch w.option.protocol {
case config.ProtocolSimple, config.ProtocolOpen, config.ProtocolCanalJSON:
case config.ProtocolSimple, config.ProtocolOpen, config.ProtocolCanalJSON, config.ProtocolDebezium:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
Expand Down
7 changes: 7 additions & 0 deletions downstreamadapter/worker/cloudstorage_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/robfig/cron"
"go.uber.org/zap"
Expand Down Expand Up @@ -83,6 +84,12 @@ func (w *CloudStorageDDLWorker) WriteBlockEvent(event *commonEvent.DDLEvent) err
return err
}
}
if event.GetDDLType() == model.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.MultipleTableInfos[1], event.GetCommitTs(), w.cfg.OutputColumnID)
return w.writeFile(event, sourceTableDef)
}
event.PostFlush()
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions downstreamadapter/worker/mq_ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (w *MQDDLWorker) WriteBlockEvent(ctx context.Context, event *event.DDLEvent
if err != nil {
return errors.Trace(err)
}
if message == nil {
log.Info("Skip ddl event", zap.Uint64("commitTs", e.GetCommitTs()),
zap.String("query", e.Query),
zap.String("changefeed", w.changeFeedID.String()))
continue
}
topic := w.eventRouter.GetTopicForDDL(e)
// Notice: We must call GetPartitionNum here,
// which will be responsible for automatically creating topics when they don't exist.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b
github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31
Expand Down
6 changes: 5 additions & 1 deletion logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,11 @@ func extractTableInfoFuncForExchangeTablePartition(event *PersistedDDLEvent, tab
pmodel.NewCIStr(event.TableName).O,
tableID,
false,
columnSchema)
columnSchema,
event.TableInfo.Charset,
event.TableInfo.Collate,
event.TableInfo.Comment,
)
return tableInfo, false
}

Expand Down
30 changes: 0 additions & 30 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,36 +146,6 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
// Some ddl event may be multi-events, we need to split it into multiple messages.
// Such as rename table test.table1 to test.table10, test.table2 to test.table20
switch model.ActionType(d.Type) {
case model.ActionExchangeTablePartition:
if len(d.MultipleTableInfos) != 2 {
log.Panic("multipleTableInfos length should be equal to 2", zap.Any("multipleTableInfos", d.MultipleTableInfos))
}
return []*DDLEvent{
// partition table before exchange
{
Version: d.Version,
Type: d.Type,
// SchemaID: d.SchemaID,
// TableID: d.TableID,
SchemaName: d.SchemaName,
TableName: d.TableName,
TableInfo: d.MultipleTableInfos[0],
Query: d.Query,
FinishedTs: d.FinishedTs,
},
// normal table before exchange(TODO: this may be wrong)
{
Version: d.Version,
Type: d.Type,
// SchemaID: d.TableInfo.SchemaID,
// TableID: d.TableInfo.TableName.TableID,
TableInfo: d.MultipleTableInfos[1],
SchemaName: d.ExtraSchemaName,
TableName: d.ExtraTableName,
Query: d.Query,
FinishedTs: d.FinishedTs,
},
}
case model.ActionCreateTables, model.ActionRenameTables:
events := make([]*DDLEvent, 0, len(d.MultipleTableInfos))
queries, err := SplitQueries(d.Query)
Expand Down
11 changes: 8 additions & 3 deletions pkg/common/table_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type TableInfo struct {
// record the logical ID from the DDL event(job.BinlogInfo.TableInfo).
// So be careful when using the TableInfo.
TableName TableName `json:"table-name"`
Charset string `json:"charset"`
Collate string `json:"collate"`
Comment string `json:"comment"`

columnSchema *columnSchema `json:"-"`

Expand Down Expand Up @@ -407,8 +410,11 @@ func newTableInfo(schema, table string, tableID int64, isPartition bool, columnS
return ti
}

func NewTableInfo(schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema) *TableInfo {
func NewTableInfo(schemaName string, tableName string, tableID int64, isPartition bool, columnSchema *columnSchema, charset, collate, comment string) *TableInfo {
ti := newTableInfo(schemaName, tableName, tableID, isPartition, columnSchema)
ti.Charset = charset
ti.Collate = collate
ti.Comment = comment

// when this tableInfo is released, we need to cut down the reference count of the columnSchema
// This function should be appeared when tableInfo is created as a pair.
Expand All @@ -424,8 +430,7 @@ func WrapTableInfo(schemaName string, info *model.TableInfo) *TableInfo {
// search column schema object
sharedColumnSchemaStorage := GetSharedColumnSchemaStorage()
columnSchema := sharedColumnSchemaStorage.GetOrSetColumnSchema(info)

return NewTableInfo(schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema)
return NewTableInfo(schemaName, info.Name.O, info.ID, info.GetPartitionInfo() != nil, columnSchema, info.Charset, info.Collate, info.Comment)
}

// NewTableInfo4Decoder is only used by the codec decoder for the test purpose,
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ var (
)

// codec related errors
ErrDDLUnsupportType = errors.Normalize(
"unsupport ddl type %s, query %s",
errors.RFCCodeText("CDC:ErrDDLUnsupportType"),
)
ErrEncodeFailed = errors.Normalize(
"encode failed",
errors.RFCCodeText("CDC:ErrEncodeFailed"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/canal/canal_json_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func newJSONMessageForDML(
out.String("")
}

valueMap := make(map[int64]string, columnLen) // colId -> value
javaTypeMap := make(map[int64]JavaSQLType, columnLen) // colId -> javaType
valueMap := make(map[int64]string, columnLen) // colId -> value
javaTypeMap := make(map[int64]common.JavaSQLType, columnLen) // colId -> javaType

row := e.GetRows()
if e.IsDelete() {
Expand Down
Loading