Skip to content

WIP: optimization scheduling #1204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Dispatcher struct {
// shared by the event dispatcher manager
sink sink.Sink

// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
statusesChan chan TableSpanStatusWithSeq
// blockStatusesChan use to collector block status of ddl/sync point event to Maintainer
// shared by the event dispatcher manager
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
Expand Down Expand Up @@ -147,6 +150,7 @@ type Dispatcher struct {
errCh chan error

bdrMode bool
seq uint64
}

func NewDispatcher(
Expand All @@ -155,6 +159,7 @@ func NewDispatcher(
tableSpan *heartbeatpb.TableSpan,
sink sink.Sink,
startTs uint64,
statusesChan chan TableSpanStatusWithSeq,
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
Expand All @@ -172,9 +177,10 @@ func NewDispatcher(
sink: sink,
startTs: startTs,
startTsIsSyncpoint: startTsIsSyncpoint,
statusesChan: statusesChan,
blockStatusesChan: blockStatusesChan,
syncPointConfig: syncPointConfig,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
resolvedTs: startTs,
filterConfig: filterConfig,
isRemoving: atomic.Bool{},
Expand Down Expand Up @@ -331,6 +337,12 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
continue
}

// only when we receive the first event, we can regard the dispatcher begin syncing data
// then turning into working status.
if d.isFirstEvent(event) {
d.updateComponentStatus()
}

switch event.GetType() {
case commonEvent.TypeResolvedEvent:
atomic.StoreUint64(&d.resolvedTs, event.(commonEvent.ResolvedEvent).ResolvedTs)
Expand Down Expand Up @@ -815,3 +827,33 @@ func (d *Dispatcher) HandleCheckpointTs(checkpointTs uint64) {
func (d *Dispatcher) IsTableTriggerEventDispatcher() bool {
return d.tableSpan == heartbeatpb.DDLSpan
}

func (d *Dispatcher) SetSeq(seq uint64) {
d.seq = seq
}

func (d *Dispatcher) isFirstEvent(event commonEvent.Event) bool {
if d.componentStatus.Get() == heartbeatpb.ComponentState_Initializing {
switch event.GetType() {
case commonEvent.TypeResolvedEvent, commonEvent.TypeDMLEvent, commonEvent.TypeDDLEvent, commonEvent.TypeSyncPointEvent:
if event.GetCommitTs() > d.startTs {
return true
}
}
}
return false
}

func (d *Dispatcher) updateComponentStatus() {
d.componentStatus.Set(heartbeatpb.ComponentState_Working)
d.statusesChan <- TableSpanStatusWithSeq{
TableSpanStatus: &heartbeatpb.TableSpanStatus{
ID: d.id.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
},
CheckpointTs: d.GetCheckpointTs(),
ResolvedTs: d.GetResolvedTs(),
Seq: d.seq,
}

}
7 changes: 7 additions & 0 deletions downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ func (s *ComponentStateWithMutex) Get() heartbeatpb.ComponentState {
return s.componentStatus
}

type TableSpanStatusWithSeq struct {
*heartbeatpb.TableSpanStatus
CheckpointTs uint64
ResolvedTs uint64
Seq uint64
}

/*
HeartBeatInfo is used to collect the message for HeartBeatRequest for each dispatcher.
Mainly about the progress of each dispatcher:
Expand Down
42 changes: 18 additions & 24 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type EventDispatcherManager struct {

// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
statusesChan chan TableSpanStatusWithSeq
statusesChan chan dispatcher.TableSpanStatusWithSeq
// heartbeatRequestQueue is used to store the heartbeat request from all the dispatchers.
// heartbeat collector will consume the heartbeat request from the queue and send the response to each dispatcher.
heartbeatRequestQueue *HeartbeatRequestQueue
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewEventDispatcherManager(
dispatcherMap: newDispatcherMap(),
changefeedID: changefeedID,
pdClock: pdClock,
statusesChan: make(chan TableSpanStatusWithSeq, 8192),
statusesChan: make(chan dispatcher.TableSpanStatusWithSeq, 8192),
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
errCh: make(chan error, 1),
cancel: cancel,
Expand Down Expand Up @@ -499,6 +499,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re
e.changefeedID,
id, tableSpans[idx], e.sink,
uint64(newStartTsList[idx]),
e.statusesChan,
e.blockStatusesChan,
schemaIds[idx],
e.schemaIDToDispatchers,
Expand All @@ -524,14 +525,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re
}

seq := e.dispatcherMap.Set(id, d)
e.statusesChan <- TableSpanStatusWithSeq{
TableSpanStatus: &heartbeatpb.TableSpanStatus{
ID: id.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
},
StartTs: uint64(newStartTsList[idx]),
Seq: seq,
}
d.SetSeq(seq)

if d.IsTableTriggerEventDispatcher() {
e.metricTableTriggerEventDispatcherCount.Inc()
Expand Down Expand Up @@ -651,11 +645,11 @@ func (e *EventDispatcherManager) collectComponentStatusWhenChanged(ctx context.C
case tableSpanStatus := <-e.statusesChan:
statusMessage = append(statusMessage, tableSpanStatus.TableSpanStatus)
newWatermark.Seq = tableSpanStatus.Seq
if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.CheckpointTs {
newWatermark.CheckpointTs = tableSpanStatus.StartTs
if tableSpanStatus.CheckpointTs != 0 && tableSpanStatus.CheckpointTs < newWatermark.CheckpointTs {
newWatermark.CheckpointTs = tableSpanStatus.CheckpointTs
}
if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.ResolvedTs {
newWatermark.ResolvedTs = tableSpanStatus.StartTs
if tableSpanStatus.ResolvedTs != 0 && tableSpanStatus.ResolvedTs < newWatermark.ResolvedTs {
newWatermark.ResolvedTs = tableSpanStatus.ResolvedTs
}
delay := time.NewTimer(10 * time.Millisecond)
loop:
Expand All @@ -666,11 +660,11 @@ func (e *EventDispatcherManager) collectComponentStatusWhenChanged(ctx context.C
if newWatermark.Seq < tableSpanStatus.Seq {
newWatermark.Seq = tableSpanStatus.Seq
}
if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.CheckpointTs {
newWatermark.CheckpointTs = tableSpanStatus.StartTs
if tableSpanStatus.CheckpointTs != 0 && tableSpanStatus.CheckpointTs < newWatermark.CheckpointTs {
newWatermark.CheckpointTs = tableSpanStatus.CheckpointTs
}
if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.ResolvedTs {
newWatermark.ResolvedTs = tableSpanStatus.StartTs
if tableSpanStatus.ResolvedTs != 0 && tableSpanStatus.ResolvedTs < newWatermark.ResolvedTs {
newWatermark.ResolvedTs = tableSpanStatus.ResolvedTs
}
case <-delay.C:
break loop
Expand Down Expand Up @@ -768,22 +762,22 @@ func (e *EventDispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatu
}

func (e *EventDispatcherManager) removeDispatcher(id common.DispatcherID) {
dispatcher, ok := e.dispatcherMap.Get(id)
dispatcherItem, ok := e.dispatcherMap.Get(id)
if ok {
if dispatcher.GetRemovingStatus() {
if dispatcherItem.GetRemovingStatus() {
return
}
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcherItem)

// for non-mysql class sink, only the event dispatcher manager with table trigger event dispatcher need to receive the checkpointTs message.
if dispatcher.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType {
if dispatcherItem.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType {
err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveCheckpointTsMessage(e.changefeedID)
log.Error("remove checkpointTs message ds failed", zap.Error(err))
}

dispatcher.Remove()
dispatcherItem.Remove()
} else {
e.statusesChan <- TableSpanStatusWithSeq{
e.statusesChan <- dispatcher.TableSpanStatusWithSeq{
TableSpanStatus: &heartbeatpb.TableSpanStatus{
ID: id.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Stopped,
Expand Down
6 changes: 0 additions & 6 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,6 @@ func toEventFilterRulePB(rule *config.EventFilterRule) *eventpb.EventFilterRule
return eventFilterPB
}

type TableSpanStatusWithSeq struct {
*heartbeatpb.TableSpanStatus
StartTs uint64
Seq uint64
}

type Watermark struct {
mutex sync.Mutex
*heartbeatpb.Watermark
Expand Down
Loading