Skip to content

Commit 510f44f

Browse files
committed
Merge branch 'master' into debezium
2 parents 597f318 + 86fee60 commit 510f44f

File tree

32 files changed

+351
-110
lines changed

32 files changed

+351
-110
lines changed

downstreamadapter/dispatcher/dispatcher.go

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
235235
// deal with the dispatcher action
236236
action := dispatcherStatus.GetAction()
237237
if action != nil {
238-
pendingEvent, blockStatus := d.blockEventStatus.getEventAndStage()
238+
pendingEvent := d.blockEventStatus.getEvent()
239239
if pendingEvent == nil && action.CommitTs > d.GetResolvedTs() {
240240
// we have not received the block event, and the action is for the future event, so just ignore
241241
log.Info("pending event is nil, and the action's commit is larger than dispatchers resolvedTs",
@@ -245,7 +245,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
245245
// we have not received the block event, and the action is for the future event, so just ignore
246246
return
247247
}
248-
if pendingEvent != nil && action.CommitTs == pendingEvent.GetCommitTs() && blockStatus == heartbeatpb.BlockStage_WAITING {
248+
if d.blockEventStatus.actionMatchs(action) {
249249
log.Info("pending event get the action",
250250
zap.Any("action", action),
251251
zap.Stringer("dispatcher", d.id),
@@ -404,8 +404,9 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
404404
syncPoint := event.(*commonEvent.SyncPointEvent)
405405
log.Info("dispatcher receive sync point event",
406406
zap.Stringer("dispatcher", d.id),
407-
zap.Uint64("commitTs", event.GetCommitTs()),
407+
zap.Any("commitTsList", syncPoint.GetCommitTsList()),
408408
zap.Uint64("seq", event.GetSeq()))
409+
409410
syncPoint.AddPostFlushFunc(func() {
410411
wakeCallback()
411412
})
@@ -546,25 +547,57 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
546547
}
547548
} else {
548549
d.blockEventStatus.setBlockEvent(event, heartbeatpb.BlockStage_WAITING)
549-
message := &heartbeatpb.TableSpanBlockStatus{
550-
ID: d.id.ToPB(),
551-
State: &heartbeatpb.State{
552-
IsBlocked: true,
553-
BlockTs: event.GetCommitTs(),
554-
BlockTables: event.GetBlockedTables().ToPB(),
555-
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
556-
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
557-
UpdatedSchemas: commonEvent.ToSchemaIDChangePB(event.GetUpdatedSchemas()), // only exists for rename table and rename tables
558-
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, // sync point event must should block
559-
Stage: heartbeatpb.BlockStage_WAITING,
560-
},
561-
}
562-
identifier := BlockEventIdentifier{
563-
CommitTs: event.GetCommitTs(),
564-
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
550+
551+
if event.GetType() == commonEvent.TypeSyncPointEvent {
552+
// deal with multi sync point commit ts in one Sync Point Event
553+
// make each commitTs as a single message for maintainer
554+
// Because the batch commitTs in different dispatchers can be different.
555+
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
556+
blockTables := event.GetBlockedTables().ToPB()
557+
needDroppedTables := event.GetNeedDroppedTables().ToPB()
558+
needAddedTables := commonEvent.ToTablesPB(event.GetNeedAddedTables())
559+
for _, commitTs := range commitTsList {
560+
message := &heartbeatpb.TableSpanBlockStatus{
561+
ID: d.id.ToPB(),
562+
State: &heartbeatpb.State{
563+
IsBlocked: true,
564+
BlockTs: commitTs,
565+
BlockTables: blockTables,
566+
NeedDroppedTables: needDroppedTables,
567+
NeedAddedTables: needAddedTables,
568+
UpdatedSchemas: nil,
569+
IsSyncPoint: true,
570+
Stage: heartbeatpb.BlockStage_WAITING,
571+
},
572+
}
573+
identifier := BlockEventIdentifier{
574+
CommitTs: commitTs,
575+
IsSyncPoint: true,
576+
}
577+
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
578+
d.blockStatusesChan <- message
579+
}
580+
} else {
581+
message := &heartbeatpb.TableSpanBlockStatus{
582+
ID: d.id.ToPB(),
583+
State: &heartbeatpb.State{
584+
IsBlocked: true,
585+
BlockTs: event.GetCommitTs(),
586+
BlockTables: event.GetBlockedTables().ToPB(),
587+
NeedDroppedTables: event.GetNeedDroppedTables().ToPB(),
588+
NeedAddedTables: commonEvent.ToTablesPB(event.GetNeedAddedTables()),
589+
UpdatedSchemas: commonEvent.ToSchemaIDChangePB(event.GetUpdatedSchemas()), // only exists for rename table and rename tables
590+
IsSyncPoint: false,
591+
Stage: heartbeatpb.BlockStage_WAITING,
592+
},
593+
}
594+
identifier := BlockEventIdentifier{
595+
CommitTs: event.GetCommitTs(),
596+
IsSyncPoint: false,
597+
}
598+
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
599+
d.blockStatusesChan <- message
565600
}
566-
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
567-
d.blockStatusesChan <- message
568601
}
569602

570603
// dealing with events which update schema ids

downstreamadapter/dispatcher/dispatcher_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ func TestDispatcherHandleEvents(t *testing.T) {
363363
// ===== sync point event =====
364364

365365
syncPointEvent := &commonEvent.SyncPointEvent{
366-
CommitTs: 6,
366+
CommitTsList: []uint64{6},
367367
}
368368
block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, syncPointEvent)}, callback)
369369
require.Equal(t, true, block)
@@ -380,7 +380,7 @@ func TestDispatcherHandleEvents(t *testing.T) {
380380
// receive the ack info
381381
dispatcherStatus = &heartbeatpb.DispatcherStatus{
382382
Ack: &heartbeatpb.ACK{
383-
CommitTs: syncPointEvent.CommitTs,
383+
CommitTs: syncPointEvent.GetCommitTs(),
384384
IsSyncPoint: true,
385385
},
386386
}
@@ -394,7 +394,7 @@ func TestDispatcherHandleEvents(t *testing.T) {
394394
dispatcherStatus = &heartbeatpb.DispatcherStatus{
395395
Action: &heartbeatpb.DispatcherAction{
396396
Action: heartbeatpb.Action_Pass,
397-
CommitTs: syncPointEvent.CommitTs,
397+
CommitTs: syncPointEvent.GetCommitTs(),
398398
IsSyncPoint: true,
399399
},
400400
}

downstreamadapter/dispatcher/helper.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type BlockEventStatus struct {
7474
mutex sync.Mutex
7575
blockPendingEvent commonEvent.BlockEvent
7676
blockStage heartbeatpb.BlockStage
77+
// record all the commitTs of this pending event
78+
// mainly for the batch sync point event
79+
blockCommitTsMap map[uint64]struct{}
7780
}
7881

7982
func (b *BlockEventStatus) clear() {
@@ -82,6 +85,7 @@ func (b *BlockEventStatus) clear() {
8285

8386
b.blockPendingEvent = nil
8487
b.blockStage = heartbeatpb.BlockStage_NONE
88+
b.blockCommitTsMap = make(map[uint64]struct{})
8589
}
8690

8791
func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStage heartbeatpb.BlockStage) {
@@ -90,6 +94,15 @@ func (b *BlockEventStatus) setBlockEvent(event commonEvent.BlockEvent, blockStag
9094

9195
b.blockPendingEvent = event
9296
b.blockStage = blockStage
97+
b.blockCommitTsMap = make(map[uint64]struct{})
98+
99+
if event.GetType() == commonEvent.TypeSyncPointEvent {
100+
for _, ts := range event.(*commonEvent.SyncPointEvent).GetCommitTsList() {
101+
b.blockCommitTsMap[ts] = struct{}{}
102+
}
103+
} else {
104+
b.blockCommitTsMap[event.GetCommitTs()] = struct{}{}
105+
}
93106
}
94107

95108
func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) {
@@ -98,13 +111,47 @@ func (b *BlockEventStatus) updateBlockStage(blockStage heartbeatpb.BlockStage) {
98111
b.blockStage = blockStage
99112
}
100113

114+
func (b *BlockEventStatus) getEvent() commonEvent.BlockEvent {
115+
b.mutex.Lock()
116+
defer b.mutex.Unlock()
117+
118+
return b.blockPendingEvent
119+
}
120+
101121
func (b *BlockEventStatus) getEventAndStage() (commonEvent.BlockEvent, heartbeatpb.BlockStage) {
102122
b.mutex.Lock()
103123
defer b.mutex.Unlock()
104124

105125
return b.blockPendingEvent, b.blockStage
106126
}
107127

128+
// actionMatchs checks whether the action is for the current pending ddl event.
129+
// Most of time, the pending event only have one commitTs, so when the commitTs of the action meets the pending event's commitTs, it is enough.
130+
// While if the pending event is a sync point event with multiple commitTs, we only can do the action
131+
// when all the commitTs have been received.
132+
func (b *BlockEventStatus) actionMatchs(action *heartbeatpb.DispatcherAction) bool {
133+
b.mutex.Lock()
134+
defer b.mutex.Unlock()
135+
136+
if b.blockPendingEvent == nil {
137+
return false
138+
}
139+
140+
if b.blockStage != heartbeatpb.BlockStage_WAITING {
141+
return false
142+
}
143+
144+
_, ok := b.blockCommitTsMap[action.CommitTs]
145+
if ok {
146+
delete(b.blockCommitTsMap, action.CommitTs)
147+
}
148+
149+
if len(b.blockCommitTsMap) == 0 {
150+
return true
151+
}
152+
return false
153+
}
154+
108155
type SchemaIDToDispatchers struct {
109156
mutex sync.RWMutex
110157
m map[int64]map[common.DispatcherID]interface{}

downstreamadapter/dispatcher/table_progress.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,21 @@ func NewTableProgress() *TableProgress {
6262

6363
// Add inserts a new event into the TableProgress.
6464
func (p *TableProgress) Add(event commonEvent.FlushableEvent) {
65-
ts := Ts{startTs: event.GetStartTs(), commitTs: event.GetCommitTs()}
65+
commitTs := event.GetCommitTs()
66+
if event.GetType() == commonEvent.TypeSyncPointEvent {
67+
// if the event is a sync point event, we use the last commitTs(the largest commitTs in the event) to calculate the progress.
68+
// because a sync point event with multiple commitTs means there is no ddl / dmls between these commitTses.
69+
// So we can just use the largest commitTs in the sync point event to calculate the progress.
70+
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
71+
commitTs = commitTsList[len(commitTsList)-1]
72+
}
73+
74+
ts := Ts{startTs: event.GetStartTs(), commitTs: commitTs}
6675
p.rwMutex.Lock()
6776
defer p.rwMutex.Unlock()
6877
elem := p.list.PushBack(ts)
6978
p.elemMap[ts] = elem
70-
p.maxCommitTs = event.GetCommitTs()
79+
p.maxCommitTs = commitTs
7180
event.PushFrontFlushFunc(func() {
7281
p.Remove(event)
7382
})
@@ -98,7 +107,16 @@ func (p *TableProgress) Empty() bool {
98107
func (p *TableProgress) Pass(event commonEvent.BlockEvent) {
99108
p.rwMutex.Lock()
100109
defer p.rwMutex.Unlock()
110+
101111
p.maxCommitTs = event.GetCommitTs()
112+
113+
if event.GetType() == commonEvent.TypeSyncPointEvent {
114+
// if the event is a sync point event, we use the last commitTs(the largest commitTs in the event) to calculate the progress.
115+
// because a sync point event with multiple commitTs means there is no ddl / dmls between these commitTses.
116+
// So we can just use the largest commitTs in the sync point event to calculate the progress.
117+
commitTsList := event.(*commonEvent.SyncPointEvent).GetCommitTsList()
118+
p.maxCommitTs = commitTsList[len(commitTsList)-1]
119+
}
102120
}
103121

104122
// GetCheckpointTs returns the current checkpoint timestamp for the table span.

downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package dispatcherorchestrator
1616
import (
1717
"context"
1818
"encoding/json"
19+
"sync"
1920
"time"
2021

2122
"github.com/pingcap/log"
@@ -36,6 +37,7 @@ import (
3637
// for different change feeds based on maintainer bootstrap messages.
3738
type DispatcherOrchestrator struct {
3839
mc messaging.MessageCenter
40+
mutex sync.Mutex // protect dispatcherManagers
3941
dispatcherManagers map[common.ChangeFeedID]*dispatchermanager.EventDispatcherManager
4042
}
4143

@@ -81,7 +83,10 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
8183
return err
8284
}
8385

86+
m.mutex.Lock()
8487
manager, exists := m.dispatcherManagers[cfId]
88+
m.mutex.Unlock()
89+
8590
var err error
8691
var startTs uint64
8792
if !exists {
@@ -112,7 +117,9 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
112117
}
113118
return m.sendResponse(from, messaging.MaintainerManagerTopic, response)
114119
}
120+
m.mutex.Lock()
115121
m.dispatcherManagers[cfId] = manager
122+
m.mutex.Unlock()
116123
metrics.EventDispatcherManagerGauge.WithLabelValues(cfId.Namespace(), cfId.Name()).Inc()
117124
} else {
118125
// Check and potentially add a table trigger event dispatcher.
@@ -164,7 +171,11 @@ func (m *DispatcherOrchestrator) handlePostBootstrapRequest(
164171
req *heartbeatpb.MaintainerPostBootstrapRequest,
165172
) error {
166173
cfId := common.NewChangefeedIDFromPB(req.ChangefeedID)
174+
175+
m.mutex.Lock()
167176
manager, exists := m.dispatcherManagers[cfId]
177+
m.mutex.Unlock()
178+
168179
if !exists || manager.GetTableTriggerEventDispatcher() == nil {
169180
log.Error("Receive post bootstrap request but there is no table trigger event dispatcher",
170181
zap.Any("changefeedID", cfId.Name()))
@@ -220,6 +231,7 @@ func (m *DispatcherOrchestrator) handleCloseRequest(
220231
Success: true,
221232
}
222233

234+
m.mutex.Lock()
223235
if manager, ok := m.dispatcherManagers[cfId]; ok {
224236
if closed := manager.TryClose(req.Removed); closed {
225237
delete(m.dispatcherManagers, cfId)
@@ -229,6 +241,7 @@ func (m *DispatcherOrchestrator) handleCloseRequest(
229241
response.Success = false
230242
}
231243
}
244+
m.mutex.Unlock()
232245

233246
log.Info("try close dispatcher manager",
234247
zap.String("changefeed", cfId.Name()), zap.Bool("success", response.Success))
@@ -275,6 +288,17 @@ func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg mess
275288
func (m *DispatcherOrchestrator) Close() {
276289
log.Info("dispatcher orchestrator is closing")
277290
m.mc.DeRegisterHandler(messaging.DispatcherManagerManagerTopic)
291+
292+
m.mutex.Lock()
293+
defer m.mutex.Unlock()
294+
for len(m.dispatcherManagers) > 0 {
295+
for id, manager := range m.dispatcherManagers {
296+
ok := manager.TryClose(false)
297+
if ok {
298+
delete(m.dispatcherManagers, id)
299+
}
300+
}
301+
}
278302
log.Info("dispatcher orchestrator closed")
279303
}
280304

0 commit comments

Comments
 (0)