From 84e80c837718969cfee0b420d6532266405bb3b5 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 2 Apr 2025 13:56:08 +0800 Subject: [PATCH 1/5] WIP: optimization scheduling --- downstreamadapter/dispatcher/dispatcher.go | 44 +++- downstreamadapter/dispatcher/helper.go | 7 + .../event_dispatcher_manager.go | 48 ++-- downstreamadapter/dispatchermanager/helper.go | 6 - heartbeatpb/heartbeat.pb.go | 248 +++++++++--------- heartbeatpb/heartbeat.proto | 1 + maintainer/maintainer_controller.go | 4 +- maintainer/replica/replication_span.go | 6 +- maintainer/split/region_count_splitter.go | 61 ++--- maintainer/split/splitter.go | 35 ++- pkg/scheduler/balance.go | 16 +- pkg/scheduler/basic.go | 118 ++++++++- pkg/scheduler/replica/replication.go | 33 +++ pkg/scheduler/replica/replication_group.go | 5 + 14 files changed, 414 insertions(+), 218 deletions(-) diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 2213c71de..b899c0498 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -107,6 +107,9 @@ type Dispatcher struct { // shared by the event dispatcher manager sink sink.Sink + // statusesChan is used to store the status of dispatchers when status changed + // and push to heartbeatRequestQueue + statusesChan chan TableSpanStatusWithSeq // blockStatusesChan use to collector block status of ddl/sync point event to Maintainer // shared by the event dispatcher manager blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus @@ -147,6 +150,7 @@ type Dispatcher struct { errCh chan error bdrMode bool + seq uint64 } func NewDispatcher( @@ -155,6 +159,7 @@ func NewDispatcher( tableSpan *heartbeatpb.TableSpan, sink sink.Sink, startTs uint64, + statusesChan chan TableSpanStatusWithSeq, blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus, schemaID int64, schemaIDToDispatchers *SchemaIDToDispatchers, @@ -172,9 +177,10 @@ func NewDispatcher( sink: sink, startTs: startTs, startTsIsSyncpoint: startTsIsSyncpoint, + statusesChan: statusesChan, blockStatusesChan: blockStatusesChan, syncPointConfig: syncPointConfig, - componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working), + componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing), resolvedTs: startTs, filterConfig: filterConfig, isRemoving: atomic.Bool{}, @@ -331,6 +337,12 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba continue } + // only when we receive the first event, we can regard the dispatcher begin syncing data + // then turning into working status. + if d.isFirstEvent(event) { + d.updateComponentStatus() + } + switch event.GetType() { case commonEvent.TypeResolvedEvent: atomic.StoreUint64(&d.resolvedTs, event.(commonEvent.ResolvedEvent).ResolvedTs) @@ -815,3 +827,33 @@ func (d *Dispatcher) HandleCheckpointTs(checkpointTs uint64) { func (d *Dispatcher) IsTableTriggerEventDispatcher() bool { return d.tableSpan == heartbeatpb.DDLSpan } + +func (d *Dispatcher) SetSeq(seq uint64) { + d.seq = seq +} + +func (d *Dispatcher) isFirstEvent(event commonEvent.Event) bool { + if d.componentStatus.Get() == heartbeatpb.ComponentState_Initializing { + switch event.GetType() { + case commonEvent.TypeResolvedEvent, commonEvent.TypeDMLEvent, commonEvent.TypeDDLEvent, commonEvent.TypeSyncPointEvent: + if event.GetCommitTs() > d.startTs { + return true + } + } + } + return false +} + +func (d *Dispatcher) updateComponentStatus() { + d.componentStatus.Set(heartbeatpb.ComponentState_Working) + d.statusesChan <- TableSpanStatusWithSeq{ + TableSpanStatus: &heartbeatpb.TableSpanStatus{ + ID: d.id.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + }, + CheckpointTs: d.GetCheckpointTs(), + ResolvedTs: d.GetResolvedTs(), + Seq: d.seq, + } + +} diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index d7de0e087..9baf28cde 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -227,6 +227,13 @@ func (s *ComponentStateWithMutex) Get() heartbeatpb.ComponentState { return s.componentStatus } +type TableSpanStatusWithSeq struct { + *heartbeatpb.TableSpanStatus + CheckpointTs uint64 + ResolvedTs uint64 + Seq uint64 +} + /* HeartBeatInfo is used to collect the message for HeartBeatRequest for each dispatcher. Mainly about the progress of each dispatcher: diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index e967fda1d..877aefb64 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -83,7 +83,7 @@ type EventDispatcherManager struct { // statusesChan is used to store the status of dispatchers when status changed // and push to heartbeatRequestQueue - statusesChan chan TableSpanStatusWithSeq + statusesChan chan dispatcher.TableSpanStatusWithSeq // heartbeatRequestQueue is used to store the heartbeat request from all the dispatchers. // heartbeat collector will consume the heartbeat request from the queue and send the response to each dispatcher. heartbeatRequestQueue *HeartbeatRequestQueue @@ -151,7 +151,7 @@ func NewEventDispatcherManager( dispatcherMap: newDispatcherMap(), changefeedID: changefeedID, pdClock: pdClock, - statusesChan: make(chan TableSpanStatusWithSeq, 8192), + statusesChan: make(chan dispatcher.TableSpanStatusWithSeq, 8192), blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), errCh: make(chan error, 1), cancel: cancel, @@ -499,6 +499,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re e.changefeedID, id, tableSpans[idx], e.sink, uint64(newStartTsList[idx]), + e.statusesChan, e.blockStatusesChan, schemaIds[idx], e.schemaIDToDispatchers, @@ -524,14 +525,15 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re } seq := e.dispatcherMap.Set(id, d) - e.statusesChan <- TableSpanStatusWithSeq{ - TableSpanStatus: &heartbeatpb.TableSpanStatus{ - ID: id.ToPB(), - ComponentStatus: heartbeatpb.ComponentState_Working, - }, - StartTs: uint64(newStartTsList[idx]), - Seq: seq, - } + d.SetSeq(seq) + // e.statusesChan <- TableSpanStatusWithSeq{ + // TableSpanStatus: &heartbeatpb.TableSpanStatus{ + // ID: id.ToPB(), + // ComponentStatus: heartbeatpb.ComponentState_Working, + // }, + // StartTs: uint64(newStartTsList[idx]), + // Seq: seq, + // } if d.IsTableTriggerEventDispatcher() { e.metricTableTriggerEventDispatcherCount.Inc() @@ -651,11 +653,11 @@ func (e *EventDispatcherManager) collectComponentStatusWhenChanged(ctx context.C case tableSpanStatus := <-e.statusesChan: statusMessage = append(statusMessage, tableSpanStatus.TableSpanStatus) newWatermark.Seq = tableSpanStatus.Seq - if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.CheckpointTs { - newWatermark.CheckpointTs = tableSpanStatus.StartTs + if tableSpanStatus.CheckpointTs != 0 && tableSpanStatus.CheckpointTs < newWatermark.CheckpointTs { + newWatermark.CheckpointTs = tableSpanStatus.CheckpointTs } - if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.ResolvedTs { - newWatermark.ResolvedTs = tableSpanStatus.StartTs + if tableSpanStatus.ResolvedTs != 0 && tableSpanStatus.ResolvedTs < newWatermark.ResolvedTs { + newWatermark.ResolvedTs = tableSpanStatus.ResolvedTs } delay := time.NewTimer(10 * time.Millisecond) loop: @@ -666,11 +668,11 @@ func (e *EventDispatcherManager) collectComponentStatusWhenChanged(ctx context.C if newWatermark.Seq < tableSpanStatus.Seq { newWatermark.Seq = tableSpanStatus.Seq } - if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.CheckpointTs { - newWatermark.CheckpointTs = tableSpanStatus.StartTs + if tableSpanStatus.CheckpointTs != 0 && tableSpanStatus.CheckpointTs < newWatermark.CheckpointTs { + newWatermark.CheckpointTs = tableSpanStatus.CheckpointTs } - if tableSpanStatus.StartTs != 0 && tableSpanStatus.StartTs < newWatermark.ResolvedTs { - newWatermark.ResolvedTs = tableSpanStatus.StartTs + if tableSpanStatus.ResolvedTs != 0 && tableSpanStatus.ResolvedTs < newWatermark.ResolvedTs { + newWatermark.ResolvedTs = tableSpanStatus.ResolvedTs } case <-delay.C: break loop @@ -768,22 +770,22 @@ func (e *EventDispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatu } func (e *EventDispatcherManager) removeDispatcher(id common.DispatcherID) { - dispatcher, ok := e.dispatcherMap.Get(id) + dispatcherItem, ok := e.dispatcherMap.Get(id) if ok { - if dispatcher.GetRemovingStatus() { + if dispatcherItem.GetRemovingStatus() { return } appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher) // for non-mysql class sink, only the event dispatcher manager with table trigger event dispatcher need to receive the checkpointTs message. - if dispatcher.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType { + if dispatcherItem.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType { err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveCheckpointTsMessage(e.changefeedID) log.Error("remove checkpointTs message ds failed", zap.Error(err)) } - dispatcher.Remove() + dispatcherItem.Remove() } else { - e.statusesChan <- TableSpanStatusWithSeq{ + e.statusesChan <- dispatcher.TableSpanStatusWithSeq{ TableSpanStatus: &heartbeatpb.TableSpanStatus{ ID: id.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Stopped, diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index d62f3be11..ae626850d 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -137,12 +137,6 @@ func toEventFilterRulePB(rule *config.EventFilterRule) *eventpb.EventFilterRule return eventFilterPB } -type TableSpanStatusWithSeq struct { - *heartbeatpb.TableSpanStatus - StartTs uint64 - Seq uint64 -} - type Watermark struct { mutex sync.Mutex *heartbeatpb.Watermark diff --git a/heartbeatpb/heartbeat.pb.go b/heartbeatpb/heartbeat.pb.go index b7b8b1913..998c3d42d 100644 --- a/heartbeatpb/heartbeat.pb.go +++ b/heartbeatpb/heartbeat.pb.go @@ -135,21 +135,24 @@ func (InfluenceType) EnumDescriptor() ([]byte, []int) { type ComponentState int32 const ( - ComponentState_Working ComponentState = 0 - ComponentState_Stopped ComponentState = 1 - ComponentState_Removed ComponentState = 2 + ComponentState_Working ComponentState = 0 + ComponentState_Stopped ComponentState = 1 + ComponentState_Removed ComponentState = 2 + ComponentState_Initializing ComponentState = 3 ) var ComponentState_name = map[int32]string{ 0: "Working", 1: "Stopped", 2: "Removed", + 3: "Initializing", } var ComponentState_value = map[string]int32{ - "Working": 0, - "Stopped": 1, - "Removed": 2, + "Working": 0, + "Stopped": 1, + "Removed": 2, + "Initializing": 3, } func (x ComponentState) String() string { @@ -2438,122 +2441,123 @@ func init() { func init() { proto.RegisterFile("heartbeatpb/heartbeat.proto", fileDescriptor_6d584080fdadb670) } var fileDescriptor_6d584080fdadb670 = []byte{ - // 1838 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x19, 0x4b, 0x6f, 0x23, 0x49, - 0x39, 0xdd, 0x6d, 0x3b, 0xf6, 0xe7, 0x24, 0xd3, 0x53, 0xb3, 0x33, 0xe3, 0xc9, 0xc3, 0x9b, 0x2d, - 0x40, 0x0a, 0x59, 0x48, 0x34, 0xd9, 0x1d, 0x2d, 0x20, 0x96, 0x25, 0xb1, 0xc3, 0xae, 0x15, 0x8d, - 0x37, 0x2a, 0x07, 0x0d, 0xcb, 0xc5, 0x6a, 0x77, 0x57, 0x9c, 0x56, 0xec, 0xee, 0x9e, 0xae, 0x76, - 0x92, 0x59, 0x89, 0x13, 0x57, 0x0e, 0x1c, 0x39, 0xac, 0x84, 0xf6, 0x08, 0x7f, 0x04, 0x8e, 0x73, - 0x02, 0x0e, 0x1c, 0xd0, 0x8c, 0xf8, 0x03, 0x5c, 0xb8, 0xa2, 0xaa, 0xee, 0xea, 0x97, 0xdb, 0x49, - 0x46, 0xb1, 0x38, 0xb9, 0xea, 0xab, 0xef, 0x55, 0xdf, 0xbb, 0xda, 0xb0, 0x76, 0x46, 0x0d, 0x3f, - 0x18, 0x50, 0x23, 0xf0, 0x06, 0xbb, 0xf1, 0x7a, 0xc7, 0xf3, 0xdd, 0xc0, 0x45, 0xf5, 0xd4, 0x21, - 0xfe, 0x0a, 0x6a, 0x27, 0xc6, 0x60, 0x44, 0x7b, 0x9e, 0xe1, 0xa0, 0x06, 0x2c, 0x8a, 0x4d, 0xa7, - 0xdd, 0x50, 0x36, 0x95, 0x2d, 0x8d, 0xc8, 0x2d, 0x5a, 0x85, 0x6a, 0x2f, 0x30, 0xfc, 0xe0, 0x88, - 0xbe, 0x6a, 0xa8, 0x9b, 0xca, 0xd6, 0x12, 0x89, 0xf7, 0xe8, 0x11, 0x54, 0x0e, 0x1d, 0x8b, 0x9f, - 0x68, 0xe2, 0x24, 0xda, 0xe1, 0x3f, 0xa8, 0xa0, 0x7f, 0xc1, 0x45, 0x1d, 0x50, 0x23, 0x20, 0xf4, - 0xe5, 0x84, 0xb2, 0x00, 0x7d, 0x0a, 0x4b, 0xe6, 0x99, 0xe1, 0x0c, 0xe9, 0x29, 0xa5, 0x56, 0x24, - 0xa7, 0xbe, 0xf7, 0x64, 0x27, 0xa5, 0xd3, 0x4e, 0x2b, 0x85, 0x40, 0x32, 0xe8, 0xe8, 0x63, 0xa8, - 0x5d, 0x1a, 0x01, 0xf5, 0xc7, 0x86, 0x7f, 0x2e, 0x14, 0xa9, 0xef, 0x3d, 0xca, 0xd0, 0xbe, 0x90, - 0xa7, 0x24, 0x41, 0x44, 0x3f, 0x82, 0x2a, 0x0b, 0x8c, 0x60, 0xc2, 0x28, 0x6b, 0x68, 0x9b, 0xda, - 0x56, 0x7d, 0x6f, 0x3d, 0x43, 0x14, 0x5b, 0xa0, 0x27, 0xb0, 0x48, 0x8c, 0x8d, 0xb6, 0xe0, 0x9e, - 0xe9, 0x8e, 0x3d, 0x3a, 0xa2, 0x01, 0x0d, 0x0f, 0x1b, 0xa5, 0x4d, 0x65, 0xab, 0x4a, 0xf2, 0x60, - 0xf4, 0x21, 0x68, 0xd4, 0xf7, 0x1b, 0xe5, 0x82, 0xfb, 0x90, 0x89, 0xe3, 0xd8, 0xce, 0xf0, 0xd0, - 0xf7, 0x5d, 0x9f, 0x70, 0x2c, 0x6c, 0x40, 0x2d, 0x56, 0x14, 0x61, 0x6e, 0x12, 0x6a, 0x9e, 0x7b, - 0xae, 0xed, 0x04, 0x27, 0x4c, 0x98, 0xa4, 0x44, 0x32, 0x30, 0xd4, 0x04, 0xf0, 0x29, 0x73, 0x47, - 0x17, 0xd4, 0x3a, 0x61, 0xe2, 0xe2, 0x25, 0x92, 0x82, 0x20, 0x1d, 0x34, 0x46, 0x5f, 0x0a, 0x07, - 0x94, 0x08, 0x5f, 0xe2, 0xdf, 0x80, 0xde, 0xb6, 0x99, 0x67, 0x04, 0xe6, 0x19, 0xf5, 0xf7, 0xcd, - 0xc0, 0x76, 0x1d, 0xf4, 0x21, 0x54, 0x0c, 0xb1, 0x12, 0x32, 0x56, 0xf6, 0x1e, 0x64, 0xd4, 0x0c, - 0x91, 0x48, 0x84, 0xc2, 0x5d, 0xde, 0x72, 0xc7, 0x63, 0x3b, 0x88, 0x05, 0xc6, 0x7b, 0xb4, 0x09, - 0xf5, 0x0e, 0xeb, 0xbd, 0x72, 0xcc, 0x63, 0xae, 0x9f, 0x10, 0x5b, 0x25, 0x69, 0x10, 0x6e, 0x81, - 0xb6, 0xdf, 0x3a, 0xca, 0x30, 0x51, 0xae, 0x67, 0xa2, 0x4e, 0x33, 0xf9, 0xad, 0x0a, 0x0f, 0x3b, - 0xce, 0xe9, 0x68, 0x42, 0x1d, 0x93, 0x5a, 0xc9, 0x75, 0x18, 0xfa, 0x39, 0x2c, 0xc7, 0x07, 0x27, - 0xaf, 0x3c, 0x1a, 0x5d, 0x68, 0x35, 0x73, 0xa1, 0x0c, 0x06, 0xc9, 0x12, 0xa0, 0xcf, 0x60, 0x39, - 0x61, 0xd8, 0x69, 0xf3, 0x3b, 0x6a, 0x53, 0x9e, 0x4b, 0x63, 0x90, 0x2c, 0xbe, 0x48, 0x09, 0xf3, - 0x8c, 0x8e, 0x8d, 0x4e, 0x5b, 0x18, 0x40, 0x23, 0xf1, 0x1e, 0x1d, 0xc1, 0x03, 0x7a, 0x65, 0x8e, - 0x26, 0x16, 0x4d, 0xd1, 0x58, 0x22, 0x74, 0xae, 0x15, 0x51, 0x44, 0x85, 0xff, 0xa2, 0xa4, 0x5d, - 0x19, 0x85, 0xdb, 0xaf, 0xe0, 0xa1, 0x5d, 0x64, 0x99, 0x28, 0xa1, 0x70, 0xb1, 0x21, 0xd2, 0x98, - 0xa4, 0x98, 0x01, 0x7a, 0x16, 0x07, 0x49, 0x98, 0x5f, 0x1b, 0x33, 0xd4, 0xcd, 0x85, 0x0b, 0x06, - 0xcd, 0x30, 0xcf, 0x85, 0x25, 0xea, 0x7b, 0x7a, 0x36, 0xb0, 0x5a, 0x47, 0x84, 0x1f, 0xe2, 0x6f, - 0x15, 0xb8, 0x9f, 0xaa, 0x08, 0xcc, 0x73, 0x1d, 0x46, 0xef, 0x5a, 0x12, 0x9e, 0x03, 0xb2, 0x72, - 0xd6, 0xa1, 0xd2, 0x9b, 0xb3, 0x74, 0x8f, 0xf2, 0xbc, 0x80, 0x10, 0x5f, 0xc1, 0x83, 0x56, 0x2a, - 0xf3, 0x9e, 0x53, 0xc6, 0x8c, 0xe1, 0x9d, 0x95, 0xcc, 0xe7, 0xb8, 0x3a, 0x9d, 0xe3, 0xf8, 0xef, - 0x19, 0x3f, 0xb7, 0x5c, 0xe7, 0xd4, 0x1e, 0xa2, 0x6d, 0x28, 0x31, 0xcf, 0x70, 0x22, 0x79, 0x8f, - 0x8a, 0xcb, 0x16, 0x11, 0x38, 0xbc, 0x7c, 0x33, 0x5e, 0x94, 0x63, 0xfe, 0x72, 0xcb, 0xb5, 0xb7, - 0x52, 0x71, 0x16, 0x79, 0xe9, 0x9a, 0x40, 0xcc, 0xa0, 0xf3, 0x50, 0x67, 0x32, 0xd4, 0x4b, 0x61, - 0xa8, 0xcb, 0x3d, 0xc2, 0xb0, 0x6c, 0x4e, 0x7c, 0x9f, 0x3a, 0x41, 0xdf, 0xb3, 0xfa, 0x01, 0x13, - 0x15, 0xb0, 0x44, 0xea, 0x11, 0xf0, 0xd8, 0x3a, 0x61, 0xf8, 0x6f, 0x0a, 0x3c, 0xe1, 0xb9, 0x61, - 0x4d, 0x46, 0xa9, 0xd0, 0x9e, 0x53, 0x4b, 0x78, 0x06, 0x15, 0x53, 0xd8, 0xea, 0x86, 0x78, 0x0d, - 0x0d, 0x4a, 0x22, 0x64, 0xd4, 0x82, 0x15, 0x16, 0xa9, 0x14, 0x46, 0xb2, 0x30, 0xca, 0xca, 0xde, - 0x5a, 0x86, 0xbc, 0x97, 0x41, 0x21, 0x39, 0x12, 0x7c, 0x0c, 0x0f, 0x9e, 0x1b, 0xb6, 0x13, 0x18, - 0xb6, 0x43, 0xfd, 0x2f, 0x24, 0x1d, 0xfa, 0x71, 0xaa, 0xdf, 0x28, 0x05, 0x81, 0x98, 0xd0, 0xe4, - 0x1b, 0x0e, 0xfe, 0x46, 0x05, 0x3d, 0x7f, 0x7c, 0x57, 0x0b, 0x6d, 0x00, 0xf0, 0x55, 0x9f, 0x0b, - 0xa1, 0xc2, 0x4a, 0x35, 0x52, 0xe3, 0x10, 0xce, 0x9e, 0xa2, 0xa7, 0x50, 0x0e, 0x4f, 0x8a, 0x0c, - 0xd0, 0x72, 0xc7, 0x9e, 0xeb, 0x50, 0x27, 0x10, 0xb8, 0x24, 0xc4, 0x44, 0xdf, 0x81, 0xe5, 0x24, - 0x74, 0xb9, 0xd3, 0x4b, 0x05, 0x3d, 0x2b, 0xee, 0x88, 0xda, 0xcd, 0x1d, 0x11, 0x7d, 0x0f, 0x56, - 0x06, 0xae, 0x1b, 0xb0, 0xc0, 0x37, 0xbc, 0xbe, 0xe5, 0x3a, 0xb4, 0x51, 0x11, 0xfd, 0x60, 0x39, - 0x86, 0xb6, 0x5d, 0x87, 0xe2, 0x4f, 0x60, 0xad, 0xe5, 0xba, 0xbe, 0x65, 0x3b, 0x46, 0xe0, 0xfa, - 0x07, 0xf2, 0x4c, 0x86, 0x52, 0x03, 0x16, 0x2f, 0xa8, 0xcf, 0x64, 0x87, 0xd3, 0x88, 0xdc, 0xe2, - 0xaf, 0x60, 0xbd, 0x98, 0x30, 0x2a, 0x42, 0x77, 0x70, 0xd9, 0x9f, 0x15, 0x78, 0x6f, 0xdf, 0xb2, - 0x12, 0x0c, 0xa9, 0xcd, 0xf7, 0x41, 0xb5, 0xad, 0x9b, 0x9d, 0xa5, 0xda, 0x16, 0x9f, 0xa1, 0x52, - 0x41, 0xbc, 0x14, 0x47, 0xe9, 0x94, 0xa1, 0xb5, 0x02, 0x43, 0x6f, 0xc3, 0x7d, 0x9b, 0xf5, 0x1d, - 0x7a, 0xd9, 0x4f, 0xdc, 0x2e, 0xc7, 0x14, 0x9b, 0x75, 0xe9, 0x65, 0x22, 0x0e, 0x5f, 0xc1, 0x63, - 0x42, 0xc7, 0xee, 0x05, 0xbd, 0x93, 0xba, 0x0d, 0x58, 0x34, 0x0d, 0x66, 0x1a, 0x16, 0x8d, 0xda, - 0xb6, 0xdc, 0xf2, 0x13, 0x5f, 0xf0, 0xb7, 0xa2, 0xa9, 0x40, 0x6e, 0xf1, 0x1f, 0x55, 0x58, 0x4d, - 0x84, 0x4e, 0xb9, 0xee, 0x8e, 0x31, 0x3e, 0xcb, 0x80, 0x4f, 0x84, 0x5f, 0xfd, 0x94, 0xed, 0xe2, - 0xa2, 0x68, 0xc2, 0x07, 0x01, 0xaf, 0xa0, 0xfd, 0xc0, 0xb7, 0x87, 0x43, 0xea, 0xf7, 0xe9, 0x05, - 0xaf, 0x62, 0x49, 0xe5, 0xeb, 0xdb, 0xb7, 0x68, 0xd9, 0x1b, 0x82, 0xc7, 0x49, 0xc8, 0xe2, 0x90, - 0x73, 0x48, 0x37, 0xef, 0x62, 0xdf, 0x94, 0x8b, 0x7d, 0xf3, 0x6f, 0x05, 0xd6, 0x0a, 0x2d, 0x34, - 0x9f, 0x46, 0xf9, 0x0c, 0xca, 0xbc, 0x4d, 0xc8, 0xde, 0xf8, 0x7e, 0x86, 0x2e, 0x96, 0x96, 0x34, - 0x95, 0x10, 0x5b, 0xa6, 0xb1, 0x76, 0x9b, 0xc1, 0xf6, 0x56, 0x85, 0x01, 0xff, 0x57, 0x81, 0x66, - 0x72, 0xcf, 0x63, 0x97, 0x05, 0xf3, 0x8e, 0x86, 0x5b, 0xb9, 0x56, 0xbd, 0xa3, 0x6b, 0x9f, 0xc2, - 0x62, 0xd8, 0x05, 0xe5, 0xa3, 0xe2, 0xf1, 0x54, 0xeb, 0x18, 0x1b, 0x1d, 0xe7, 0xd4, 0x25, 0x12, - 0x0f, 0xff, 0x47, 0x81, 0xf7, 0x67, 0xde, 0x7c, 0x3e, 0x5e, 0xfe, 0xbf, 0x5c, 0xfd, 0x5d, 0x62, - 0x02, 0x5f, 0x01, 0x24, 0xb6, 0xc8, 0x8c, 0xcd, 0x4a, 0x6e, 0x6c, 0x6e, 0x4a, 0xcc, 0xae, 0x31, - 0x96, 0x8d, 0x2a, 0x05, 0x41, 0x3b, 0x50, 0x11, 0xe1, 0x29, 0x0d, 0x5e, 0x30, 0x0e, 0x09, 0x7b, - 0x47, 0x58, 0xb8, 0x15, 0x3d, 0x6e, 0x85, 0xe0, 0xd9, 0x8f, 0xdb, 0xf5, 0x08, 0x2d, 0x25, 0x35, - 0x01, 0xe0, 0x3f, 0xa9, 0x80, 0xa6, 0xb3, 0x83, 0x57, 0xcb, 0x19, 0xce, 0xc9, 0x18, 0x52, 0x8d, - 0x1e, 0xcf, 0xf2, 0xca, 0x6a, 0xee, 0xca, 0x72, 0xbe, 0xd3, 0x6e, 0x31, 0xdf, 0xfd, 0x02, 0x74, - 0x53, 0xb6, 0xe3, 0x3e, 0x4b, 0x5e, 0xa3, 0x37, 0xf4, 0xec, 0x7b, 0x66, 0x7a, 0x3f, 0x61, 0xd3, - 0x49, 0x5a, 0x2e, 0x68, 0x2a, 0x1f, 0x41, 0x7d, 0x30, 0x72, 0xcd, 0xf3, 0x68, 0x6a, 0xa8, 0x08, - 0xfd, 0x50, 0x36, 0xc2, 0x05, 0x7b, 0x10, 0x68, 0x62, 0x8d, 0x5f, 0xc2, 0xa3, 0x24, 0xbc, 0x5b, - 0x23, 0x97, 0xd1, 0x39, 0x25, 0x74, 0xaa, 0xad, 0xa8, 0xd9, 0xb6, 0xe2, 0xc3, 0xe3, 0x29, 0x91, - 0xf3, 0xc9, 0x24, 0x3e, 0x4e, 0x4f, 0x4c, 0x93, 0x32, 0x26, 0x65, 0x46, 0x5b, 0xfc, 0x3b, 0x05, - 0xf4, 0xe4, 0x4d, 0x15, 0x06, 0xdb, 0x1c, 0x9e, 0xa4, 0xab, 0x50, 0x8d, 0x42, 0x32, 0xac, 0xd1, - 0x1a, 0x89, 0xf7, 0xd7, 0xbd, 0x36, 0xf1, 0xa7, 0x50, 0x16, 0x78, 0x37, 0x7c, 0xbf, 0x99, 0x11, - 0x82, 0xd8, 0x81, 0x15, 0xb9, 0x0e, 0xad, 0x71, 0x0d, 0x9f, 0x4d, 0xa8, 0x7f, 0x39, 0xb2, 0x72, - 0xac, 0xd2, 0x20, 0x8e, 0xd1, 0xa5, 0x97, 0x39, 0x5d, 0xd3, 0x20, 0xfc, 0xad, 0x06, 0xe5, 0x70, - 0xf2, 0x5c, 0x87, 0x5a, 0x87, 0x1d, 0xf0, 0xf0, 0xa1, 0xe1, 0xe0, 0x51, 0x25, 0x09, 0x80, 0x6b, - 0x21, 0x96, 0xc9, 0x73, 0x26, 0xda, 0xa2, 0xcf, 0xa0, 0x1e, 0x2e, 0x65, 0x31, 0x98, 0x9e, 0xfb, - 0xf3, 0xee, 0x21, 0x69, 0x0a, 0x74, 0x04, 0xf7, 0xbb, 0x94, 0x5a, 0x6d, 0xdf, 0xf5, 0x3c, 0x89, - 0x11, 0xb5, 0xfa, 0x1b, 0xd8, 0x4c, 0xd3, 0xa1, 0x9f, 0xc2, 0x3d, 0x0e, 0xdc, 0xb7, 0xac, 0x98, - 0x55, 0x38, 0xf3, 0xa2, 0xe9, 0x6c, 0x26, 0x79, 0x54, 0xfe, 0x0e, 0xf9, 0xa5, 0x67, 0x19, 0x01, - 0x8d, 0x4c, 0xc8, 0x1a, 0x15, 0x41, 0xbc, 0x56, 0xd4, 0x4c, 0x22, 0x07, 0x91, 0x1c, 0x49, 0xfe, - 0x53, 0xca, 0xe2, 0xd4, 0xa7, 0x14, 0xf4, 0x43, 0x31, 0xe4, 0x0f, 0x69, 0xa3, 0x2a, 0xa2, 0x32, - 0xdb, 0xaa, 0x0e, 0xa2, 0x0c, 0x1e, 0x86, 0x03, 0xfe, 0x90, 0xe2, 0x73, 0x78, 0x2f, 0xae, 0x3e, - 0xf2, 0x94, 0x97, 0x8e, 0x77, 0xa8, 0x7a, 0x5b, 0xf2, 0x59, 0xa1, 0xce, 0x2c, 0x1d, 0x21, 0x02, - 0xfe, 0xa7, 0x02, 0xf7, 0x72, 0x9f, 0xe0, 0xde, 0x45, 0x50, 0x51, 0x59, 0x54, 0xe7, 0x51, 0x16, - 0x8b, 0x66, 0xed, 0xa7, 0xf0, 0x30, 0x6c, 0xa8, 0xcc, 0xfe, 0x9a, 0xf6, 0x3d, 0xea, 0xf7, 0x19, - 0x35, 0x5d, 0x27, 0x1c, 0x14, 0x55, 0x82, 0xc4, 0x61, 0xcf, 0xfe, 0x9a, 0x1e, 0x53, 0xbf, 0x27, - 0x4e, 0xf0, 0x37, 0x0a, 0xa0, 0x94, 0x0d, 0xe7, 0x54, 0x11, 0x3f, 0x87, 0xe5, 0x41, 0xc2, 0x34, - 0xfe, 0xe2, 0xf1, 0x41, 0x71, 0x07, 0x49, 0xcb, 0xcf, 0xd2, 0x61, 0x0b, 0x96, 0xd2, 0x3d, 0x1b, - 0x21, 0x28, 0x05, 0xf6, 0x38, 0x2c, 0x5f, 0x35, 0x22, 0xd6, 0x1c, 0xe6, 0xb8, 0x96, 0x6c, 0x8e, - 0x62, 0xcd, 0x61, 0x26, 0x87, 0x69, 0x21, 0x8c, 0xaf, 0x79, 0xca, 0x8e, 0xc3, 0x0f, 0x26, 0xc2, - 0x1e, 0x35, 0x22, 0xb7, 0xf8, 0x63, 0x58, 0x4a, 0x3b, 0x8e, 0x53, 0x9f, 0xd9, 0xc3, 0xb3, 0xe8, - 0xa3, 0xa0, 0x58, 0x23, 0x1d, 0xb4, 0x91, 0x7b, 0x19, 0x25, 0x3b, 0x5f, 0xe2, 0x53, 0x58, 0x4a, - 0x9b, 0xe0, 0x76, 0x54, 0x42, 0x5b, 0xde, 0xca, 0x23, 0xcd, 0xf8, 0x9a, 0x97, 0x1a, 0xfe, 0xcb, - 0x3c, 0xc3, 0x94, 0xba, 0x25, 0x80, 0xed, 0x0d, 0xa8, 0x44, 0x9f, 0x48, 0x6b, 0x50, 0x7e, 0xe1, - 0xdb, 0x01, 0xd5, 0x17, 0x50, 0x15, 0x4a, 0xc7, 0x06, 0x63, 0xba, 0xb2, 0xbd, 0x15, 0x56, 0xc8, - 0xe4, 0xe1, 0x8f, 0x00, 0x2a, 0x2d, 0x9f, 0x1a, 0x02, 0x0f, 0xa0, 0x12, 0x3e, 0xa9, 0x74, 0x65, - 0xfb, 0x27, 0x00, 0x49, 0x32, 0x71, 0x0e, 0xdd, 0x2f, 0xbb, 0x87, 0xfa, 0x02, 0xaa, 0xc3, 0xe2, - 0x8b, 0xfd, 0xce, 0x49, 0xa7, 0xfb, 0xb9, 0xae, 0x88, 0x0d, 0x09, 0x37, 0x2a, 0xc7, 0x69, 0x73, - 0x1c, 0x6d, 0xfb, 0x07, 0xb9, 0x06, 0x82, 0x16, 0x41, 0xdb, 0x1f, 0x8d, 0xf4, 0x05, 0x54, 0x01, - 0xb5, 0x7d, 0xa0, 0x2b, 0x5c, 0x52, 0xd7, 0xf5, 0xc7, 0xc6, 0x48, 0x57, 0xb7, 0x3f, 0x81, 0x95, - 0x6c, 0x40, 0x0b, 0xb6, 0xae, 0x7f, 0x6e, 0x3b, 0xc3, 0x50, 0x60, 0x2f, 0x10, 0x55, 0x2a, 0x14, - 0x18, 0x6a, 0x68, 0xe9, 0xea, 0xc1, 0xcf, 0xfe, 0xfa, 0xa6, 0xa9, 0xbc, 0x7e, 0xd3, 0x54, 0xfe, - 0xf5, 0xa6, 0xa9, 0xfc, 0xfe, 0x6d, 0x73, 0xe1, 0xf5, 0xdb, 0xe6, 0xc2, 0x3f, 0xde, 0x36, 0x17, - 0x7e, 0xfd, 0xdd, 0xa1, 0x1d, 0x9c, 0x4d, 0x06, 0x3b, 0xa6, 0x3b, 0xde, 0xf5, 0x6c, 0x67, 0x68, - 0x1a, 0xde, 0x6e, 0x60, 0x9b, 0x96, 0xb9, 0x9b, 0x8a, 0xa9, 0x41, 0x45, 0xfc, 0x8b, 0xf0, 0xd1, - 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x1f, 0x8f, 0xc0, 0xf4, 0x64, 0x18, 0x00, 0x00, + // 1851 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x19, 0x4b, 0x6f, 0x1b, 0xc7, + 0x59, 0xbb, 0x4b, 0x51, 0xe2, 0x47, 0x49, 0x5e, 0x8f, 0x63, 0x9b, 0xb6, 0x6c, 0x46, 0x99, 0xb6, + 0x80, 0xaa, 0xb4, 0x32, 0xac, 0xc4, 0xe8, 0x03, 0x4d, 0x53, 0x89, 0x72, 0x13, 0x42, 0xb0, 0x22, + 0x8c, 0x54, 0xb8, 0xe9, 0x85, 0x18, 0xee, 0x8e, 0xa8, 0x85, 0xc8, 0xdd, 0xf5, 0xce, 0xd0, 0x92, + 0x0d, 0xf4, 0xd4, 0x6b, 0x0f, 0x3d, 0xf6, 0x10, 0xa0, 0xc8, 0xb1, 0xfd, 0x23, 0xed, 0x31, 0xa7, + 0xb6, 0x87, 0x1e, 0x0a, 0x1b, 0xfd, 0x03, 0xbd, 0xf4, 0x5a, 0xcc, 0xec, 0xce, 0xbe, 0xb8, 0x94, + 0x64, 0x88, 0xc8, 0x89, 0x33, 0xdf, 0x7c, 0xaf, 0xf9, 0xde, 0xb3, 0x84, 0xd5, 0x13, 0x46, 0x23, + 0xd1, 0x67, 0x54, 0x84, 0xfd, 0x47, 0xe9, 0x7a, 0x33, 0x8c, 0x02, 0x11, 0xa0, 0x66, 0xee, 0x10, + 0x7f, 0x09, 0x8d, 0x23, 0xda, 0x1f, 0xb2, 0xc3, 0x90, 0xfa, 0xa8, 0x05, 0x0b, 0x6a, 0xd3, 0xdd, + 0x6d, 0x19, 0x6b, 0xc6, 0xba, 0x45, 0xf4, 0x16, 0xdd, 0x87, 0xc5, 0x43, 0x41, 0x23, 0xb1, 0xc7, + 0x5e, 0xb5, 0xcc, 0x35, 0x63, 0x7d, 0x89, 0xa4, 0x7b, 0x74, 0x07, 0xea, 0x4f, 0x7d, 0x57, 0x9e, + 0x58, 0xea, 0x24, 0xd9, 0xe1, 0x3f, 0x9a, 0x60, 0x7f, 0x2e, 0x45, 0xed, 0x30, 0x2a, 0x08, 0x7b, + 0x31, 0x66, 0x5c, 0xa0, 0x4f, 0x60, 0xc9, 0x39, 0xa1, 0xfe, 0x80, 0x1d, 0x33, 0xe6, 0x26, 0x72, + 0x9a, 0x5b, 0xf7, 0x36, 0x73, 0x3a, 0x6d, 0x76, 0x72, 0x08, 0xa4, 0x80, 0x8e, 0x3e, 0x86, 0xc6, + 0x19, 0x15, 0x2c, 0x1a, 0xd1, 0xe8, 0x54, 0x29, 0xd2, 0xdc, 0xba, 0x53, 0xa0, 0x7d, 0xae, 0x4f, + 0x49, 0x86, 0x88, 0x7e, 0x0c, 0x8b, 0x5c, 0x50, 0x31, 0xe6, 0x8c, 0xb7, 0xac, 0x35, 0x6b, 0xbd, + 0xb9, 0xf5, 0xa0, 0x40, 0x94, 0x5a, 0xe0, 0x50, 0x61, 0x91, 0x14, 0x1b, 0xad, 0xc3, 0x0d, 0x27, + 0x18, 0x85, 0x6c, 0xc8, 0x04, 0x8b, 0x0f, 0x5b, 0xb5, 0x35, 0x63, 0x7d, 0x91, 0x94, 0xc1, 0xe8, + 0x43, 0xb0, 0x58, 0x14, 0xb5, 0xe6, 0x2b, 0xee, 0x43, 0xc6, 0xbe, 0xef, 0xf9, 0x83, 0xa7, 0x51, + 0x14, 0x44, 0x44, 0x62, 0x61, 0x0a, 0x8d, 0x54, 0x51, 0x84, 0xa5, 0x49, 0x98, 0x73, 0x1a, 0x06, + 0x9e, 0x2f, 0x8e, 0xb8, 0x32, 0x49, 0x8d, 0x14, 0x60, 0xa8, 0x0d, 0x10, 0x31, 0x1e, 0x0c, 0x5f, + 0x32, 0xf7, 0x88, 0xab, 0x8b, 0xd7, 0x48, 0x0e, 0x82, 0x6c, 0xb0, 0x38, 0x7b, 0xa1, 0x1c, 0x50, + 0x23, 0x72, 0x89, 0x7f, 0x0b, 0xf6, 0xae, 0xc7, 0x43, 0x2a, 0x9c, 0x13, 0x16, 0x6d, 0x3b, 0xc2, + 0x0b, 0x7c, 0xf4, 0x21, 0xd4, 0xa9, 0x5a, 0x29, 0x19, 0x2b, 0x5b, 0xb7, 0x0a, 0x6a, 0xc6, 0x48, + 0x24, 0x41, 0x91, 0x2e, 0xef, 0x04, 0xa3, 0x91, 0x27, 0x52, 0x81, 0xe9, 0x1e, 0xad, 0x41, 0xb3, + 0xcb, 0x0f, 0x5f, 0xf9, 0xce, 0x81, 0xd4, 0x4f, 0x89, 0x5d, 0x24, 0x79, 0x10, 0xee, 0x80, 0xb5, + 0xdd, 0xd9, 0x2b, 0x30, 0x31, 0x2e, 0x66, 0x62, 0x4e, 0x32, 0xf9, 0x9d, 0x09, 0xb7, 0xbb, 0xfe, + 0xf1, 0x70, 0xcc, 0x7c, 0x87, 0xb9, 0xd9, 0x75, 0x38, 0xfa, 0x05, 0x2c, 0xa7, 0x07, 0x47, 0xaf, + 0x42, 0x96, 0x5c, 0xe8, 0x7e, 0xe1, 0x42, 0x05, 0x0c, 0x52, 0x24, 0x40, 0x9f, 0xc2, 0x72, 0xc6, + 0xb0, 0xbb, 0x2b, 0xef, 0x68, 0x4d, 0x78, 0x2e, 0x8f, 0x41, 0x8a, 0xf8, 0x2a, 0x25, 0x9c, 0x13, + 0x36, 0xa2, 0xdd, 0x5d, 0x65, 0x00, 0x8b, 0xa4, 0x7b, 0xb4, 0x07, 0xb7, 0xd8, 0xb9, 0x33, 0x1c, + 0xbb, 0x2c, 0x47, 0xe3, 0xaa, 0xd0, 0xb9, 0x50, 0x44, 0x15, 0x15, 0xfe, 0xab, 0x91, 0x77, 0x65, + 0x12, 0x6e, 0xbf, 0x86, 0xdb, 0x5e, 0x95, 0x65, 0x92, 0x84, 0xc2, 0xd5, 0x86, 0xc8, 0x63, 0x92, + 0x6a, 0x06, 0xe8, 0x49, 0x1a, 0x24, 0x71, 0x7e, 0x3d, 0x9c, 0xa2, 0x6e, 0x29, 0x5c, 0x30, 0x58, + 0xd4, 0x39, 0x55, 0x96, 0x68, 0x6e, 0xd9, 0xc5, 0xc0, 0xea, 0xec, 0x11, 0x79, 0x88, 0xbf, 0x36, + 0xe0, 0x66, 0xae, 0x22, 0xf0, 0x30, 0xf0, 0x39, 0xbb, 0x6e, 0x49, 0x78, 0x06, 0xc8, 0x2d, 0x59, + 0x87, 0x69, 0x6f, 0x4e, 0xd3, 0x3d, 0xc9, 0xf3, 0x0a, 0x42, 0x7c, 0x0e, 0xb7, 0x3a, 0xb9, 0xcc, + 0x7b, 0xc6, 0x38, 0xa7, 0x83, 0x6b, 0x2b, 0x59, 0xce, 0x71, 0x73, 0x32, 0xc7, 0xf1, 0x3f, 0x0a, + 0x7e, 0xee, 0x04, 0xfe, 0xb1, 0x37, 0x40, 0x1b, 0x50, 0xe3, 0x21, 0xf5, 0x13, 0x79, 0x77, 0xaa, + 0xcb, 0x16, 0x51, 0x38, 0xb2, 0x7c, 0x73, 0x59, 0x94, 0x53, 0xfe, 0x7a, 0x2b, 0xb5, 0x77, 0x73, + 0x71, 0x96, 0x78, 0xe9, 0x82, 0x40, 0x2c, 0xa0, 0xcb, 0x50, 0xe7, 0x3a, 0xd4, 0x6b, 0x71, 0xa8, + 0xeb, 0x3d, 0xc2, 0xb0, 0xec, 0x8c, 0xa3, 0x88, 0xf9, 0xa2, 0x17, 0xba, 0x3d, 0xc1, 0x55, 0x05, + 0xac, 0x91, 0x66, 0x02, 0x3c, 0x70, 0x8f, 0x38, 0xfe, 0xbb, 0x01, 0xf7, 0x64, 0x6e, 0xb8, 0xe3, + 0x61, 0x2e, 0xb4, 0x67, 0xd4, 0x12, 0x9e, 0x40, 0xdd, 0x51, 0xb6, 0xba, 0x24, 0x5e, 0x63, 0x83, + 0x92, 0x04, 0x19, 0x75, 0x60, 0x85, 0x27, 0x2a, 0xc5, 0x91, 0xac, 0x8c, 0xb2, 0xb2, 0xb5, 0x5a, + 0x20, 0x3f, 0x2c, 0xa0, 0x90, 0x12, 0x09, 0x3e, 0x80, 0x5b, 0xcf, 0xa8, 0xe7, 0x0b, 0xea, 0xf9, + 0x2c, 0xfa, 0x5c, 0xd3, 0xa1, 0x9f, 0xe4, 0xfa, 0x8d, 0x51, 0x11, 0x88, 0x19, 0x4d, 0xb9, 0xe1, + 0xe0, 0xaf, 0x4c, 0xb0, 0xcb, 0xc7, 0xd7, 0xb5, 0xd0, 0x43, 0x00, 0xb9, 0xea, 0x49, 0x21, 0x4c, + 0x59, 0xa9, 0x41, 0x1a, 0x12, 0x22, 0xd9, 0x33, 0xf4, 0x18, 0xe6, 0xe3, 0x93, 0x2a, 0x03, 0x74, + 0x82, 0x51, 0x18, 0xf8, 0xcc, 0x17, 0x0a, 0x97, 0xc4, 0x98, 0xe8, 0x3b, 0xb0, 0x9c, 0x85, 0xae, + 0x74, 0x7a, 0xad, 0xa2, 0x67, 0xa5, 0x1d, 0xd1, 0xba, 0xbc, 0x23, 0xa2, 0xef, 0xc1, 0x4a, 0x3f, + 0x08, 0x04, 0x17, 0x11, 0x0d, 0x7b, 0x6e, 0xe0, 0xb3, 0x56, 0x5d, 0xf5, 0x83, 0xe5, 0x14, 0xba, + 0x1b, 0xf8, 0x0c, 0xff, 0x08, 0x56, 0x3b, 0x41, 0x10, 0xb9, 0x9e, 0x4f, 0x45, 0x10, 0xed, 0xe8, + 0x33, 0x1d, 0x4a, 0x2d, 0x58, 0x78, 0xc9, 0x22, 0xae, 0x3b, 0x9c, 0x45, 0xf4, 0x16, 0x7f, 0x09, + 0x0f, 0xaa, 0x09, 0x93, 0x22, 0x74, 0x0d, 0x97, 0xfd, 0xc5, 0x80, 0xf7, 0xb6, 0x5d, 0x37, 0xc3, + 0xd0, 0xda, 0x7c, 0x1f, 0x4c, 0xcf, 0xbd, 0xdc, 0x59, 0xa6, 0xe7, 0xca, 0x19, 0x2a, 0x17, 0xc4, + 0x4b, 0x69, 0x94, 0x4e, 0x18, 0xda, 0xaa, 0x30, 0xf4, 0x06, 0xdc, 0xf4, 0x78, 0xcf, 0x67, 0x67, + 0xbd, 0xcc, 0xed, 0x7a, 0x4c, 0xf1, 0xf8, 0x3e, 0x3b, 0xcb, 0xc4, 0xe1, 0x73, 0xb8, 0x4b, 0xd8, + 0x28, 0x78, 0xc9, 0xae, 0xa5, 0x6e, 0x0b, 0x16, 0x1c, 0xca, 0x1d, 0xea, 0xb2, 0xa4, 0x6d, 0xeb, + 0xad, 0x3c, 0x89, 0x14, 0x7f, 0x37, 0x99, 0x0a, 0xf4, 0x16, 0xff, 0xc9, 0x84, 0xfb, 0x99, 0xd0, + 0x09, 0xd7, 0x5d, 0x33, 0xc6, 0xa7, 0x19, 0xf0, 0x9e, 0xf2, 0x6b, 0x94, 0xb3, 0x5d, 0x5a, 0x14, + 0x1d, 0xf8, 0x40, 0xc8, 0x0a, 0xda, 0x13, 0x91, 0x37, 0x18, 0xb0, 0xa8, 0xc7, 0x5e, 0xca, 0x2a, + 0x96, 0x55, 0xbe, 0x9e, 0x77, 0x85, 0x96, 0xfd, 0x50, 0xf1, 0x38, 0x8a, 0x59, 0x3c, 0x95, 0x1c, + 0xf2, 0xcd, 0xbb, 0xda, 0x37, 0xf3, 0xd5, 0xbe, 0xf9, 0x8f, 0x01, 0xab, 0x95, 0x16, 0x9a, 0x4d, + 0xa3, 0x7c, 0x02, 0xf3, 0xb2, 0x4d, 0xe8, 0xde, 0xf8, 0x7e, 0x81, 0x2e, 0x95, 0x96, 0x35, 0x95, + 0x18, 0x5b, 0xa7, 0xb1, 0x75, 0x95, 0xc1, 0xf6, 0x4a, 0x85, 0x01, 0xff, 0xcf, 0x80, 0x76, 0x76, + 0xcf, 0x83, 0x80, 0x8b, 0x59, 0x47, 0xc3, 0x95, 0x5c, 0x6b, 0x5e, 0xd3, 0xb5, 0x8f, 0x61, 0x21, + 0xee, 0x82, 0xfa, 0x51, 0x71, 0x77, 0xa2, 0x75, 0x8c, 0x68, 0xd7, 0x3f, 0x0e, 0x88, 0xc6, 0xc3, + 0xff, 0x35, 0xe0, 0xfd, 0xa9, 0x37, 0x9f, 0x8d, 0x97, 0xbf, 0x95, 0xab, 0xbf, 0x4b, 0x4c, 0xe0, + 0x73, 0x80, 0xcc, 0x16, 0x85, 0xb1, 0xd9, 0x28, 0x8d, 0xcd, 0x6d, 0x8d, 0xb9, 0x4f, 0x47, 0xba, + 0x51, 0xe5, 0x20, 0x68, 0x13, 0xea, 0x2a, 0x3c, 0xb5, 0xc1, 0x2b, 0xc6, 0x21, 0x65, 0xef, 0x04, + 0x0b, 0x77, 0x92, 0xc7, 0xad, 0x12, 0x3c, 0xfd, 0x71, 0xfb, 0x20, 0x41, 0xcb, 0x49, 0xcd, 0x00, + 0xf8, 0xcf, 0x26, 0xa0, 0xc9, 0xec, 0x90, 0xd5, 0x72, 0x8a, 0x73, 0x0a, 0x86, 0x34, 0x93, 0xc7, + 0xb3, 0xbe, 0xb2, 0x59, 0xba, 0xb2, 0x9e, 0xef, 0xac, 0x2b, 0xcc, 0x77, 0xbf, 0x04, 0xdb, 0xd1, + 0xed, 0xb8, 0xc7, 0xb3, 0xd7, 0xe8, 0x25, 0x3d, 0xfb, 0x86, 0x93, 0xdf, 0x8f, 0xf9, 0x64, 0x92, + 0xce, 0x57, 0x34, 0x95, 0x8f, 0xa0, 0xd9, 0x1f, 0x06, 0xce, 0x69, 0x32, 0x35, 0xd4, 0x95, 0x7e, + 0xa8, 0x18, 0xe1, 0x8a, 0x3d, 0x28, 0x34, 0xb5, 0xc6, 0x2f, 0xe0, 0x4e, 0x16, 0xde, 0x9d, 0x61, + 0xc0, 0xd9, 0x8c, 0x12, 0x3a, 0xd7, 0x56, 0xcc, 0x62, 0x5b, 0x89, 0xe0, 0xee, 0x84, 0xc8, 0xd9, + 0x64, 0x92, 0x1c, 0xa7, 0xc7, 0x8e, 0xc3, 0x38, 0xd7, 0x32, 0x93, 0x2d, 0xfe, 0xbd, 0x01, 0x76, + 0xf6, 0xa6, 0x8a, 0x83, 0x6d, 0x06, 0x4f, 0xd2, 0xfb, 0xb0, 0x98, 0x84, 0x64, 0x5c, 0xa3, 0x2d, + 0x92, 0xee, 0x2f, 0x7a, 0x6d, 0xe2, 0x4f, 0x60, 0x5e, 0xe1, 0x5d, 0xf2, 0xfd, 0x66, 0x4a, 0x08, + 0x62, 0x1f, 0x56, 0xf4, 0x3a, 0xb6, 0xc6, 0x05, 0x7c, 0xd6, 0xa0, 0xf9, 0xc5, 0xd0, 0x2d, 0xb1, + 0xca, 0x83, 0x24, 0xc6, 0x3e, 0x3b, 0x2b, 0xe9, 0x9a, 0x07, 0xe1, 0xaf, 0x2d, 0x98, 0x8f, 0x27, + 0xcf, 0x07, 0xd0, 0xe8, 0xf2, 0x1d, 0x19, 0x3e, 0x2c, 0x1e, 0x3c, 0x16, 0x49, 0x06, 0x90, 0x5a, + 0xa8, 0x65, 0xf6, 0x9c, 0x49, 0xb6, 0xe8, 0x53, 0x68, 0xc6, 0x4b, 0x5d, 0x0c, 0x26, 0xe7, 0xfe, + 0xb2, 0x7b, 0x48, 0x9e, 0x02, 0xed, 0xc1, 0xcd, 0x7d, 0xc6, 0xdc, 0xdd, 0x28, 0x08, 0x43, 0x8d, + 0x91, 0xb4, 0xfa, 0x4b, 0xd8, 0x4c, 0xd2, 0xa1, 0x9f, 0xc1, 0x0d, 0x09, 0xdc, 0x76, 0xdd, 0x94, + 0x55, 0x3c, 0xf3, 0xa2, 0xc9, 0x6c, 0x26, 0x65, 0x54, 0xf9, 0x0e, 0xf9, 0x55, 0xe8, 0x52, 0xc1, + 0x12, 0x13, 0xf2, 0x56, 0x5d, 0x11, 0xaf, 0x56, 0x35, 0x93, 0xc4, 0x41, 0xa4, 0x44, 0x52, 0xfe, + 0x94, 0xb2, 0x30, 0xf1, 0x29, 0x05, 0xfd, 0x50, 0x0d, 0xf9, 0x03, 0xd6, 0x5a, 0x54, 0x51, 0x59, + 0x6c, 0x55, 0x3b, 0x49, 0x06, 0x0f, 0xe2, 0x01, 0x7f, 0xc0, 0xf0, 0x29, 0xbc, 0x97, 0x56, 0x1f, + 0x7d, 0x2a, 0x4b, 0xc7, 0x3b, 0x54, 0xbd, 0x75, 0xfd, 0xac, 0x30, 0xa7, 0x96, 0x8e, 0x18, 0x01, + 0xff, 0xcb, 0x80, 0x1b, 0xa5, 0x4f, 0x70, 0xef, 0x22, 0xa8, 0xaa, 0x2c, 0x9a, 0xb3, 0x28, 0x8b, + 0x55, 0xb3, 0xf6, 0x63, 0xb8, 0x1d, 0x37, 0x54, 0xee, 0xbd, 0x66, 0xbd, 0x90, 0x45, 0x3d, 0xce, + 0x9c, 0xc0, 0x8f, 0x07, 0x45, 0x93, 0x20, 0x75, 0x78, 0xe8, 0xbd, 0x66, 0x07, 0x2c, 0x3a, 0x54, + 0x27, 0xf8, 0x2b, 0x03, 0x50, 0xce, 0x86, 0x33, 0xaa, 0x88, 0x9f, 0xc1, 0x72, 0x3f, 0x63, 0x9a, + 0x7e, 0xf1, 0xf8, 0xa0, 0xba, 0x83, 0xe4, 0xe5, 0x17, 0xe9, 0xb0, 0x0b, 0x4b, 0xf9, 0x9e, 0x8d, + 0x10, 0xd4, 0x84, 0x37, 0x8a, 0xcb, 0x57, 0x83, 0xa8, 0xb5, 0x84, 0xf9, 0x81, 0xab, 0x9b, 0xa3, + 0x5a, 0x4b, 0x98, 0x23, 0x61, 0x56, 0x0c, 0x93, 0x6b, 0x99, 0xb2, 0xa3, 0xf8, 0x83, 0x89, 0xb2, + 0x47, 0x83, 0xe8, 0x2d, 0xfe, 0x18, 0x96, 0xf2, 0x8e, 0x93, 0xd4, 0x27, 0xde, 0xe0, 0x24, 0xf9, + 0x28, 0xa8, 0xd6, 0xc8, 0x06, 0x6b, 0x18, 0x9c, 0x25, 0xc9, 0x2e, 0x97, 0xf8, 0x18, 0x96, 0xf2, + 0x26, 0xb8, 0x1a, 0x95, 0xd2, 0x56, 0xb6, 0xf2, 0x44, 0x33, 0xb9, 0x96, 0xa5, 0x46, 0xfe, 0xf2, + 0x90, 0x3a, 0x5a, 0xb7, 0x0c, 0xb0, 0xf1, 0x10, 0xea, 0xc9, 0x27, 0xd2, 0x06, 0xcc, 0x3f, 0x8f, + 0x3c, 0xc1, 0xec, 0x39, 0xb4, 0x08, 0xb5, 0x03, 0xca, 0xb9, 0x6d, 0x6c, 0xac, 0xc7, 0x15, 0x32, + 0x7b, 0xf8, 0x23, 0x80, 0x7a, 0x27, 0x62, 0x54, 0xe1, 0x01, 0xd4, 0xe3, 0x27, 0x95, 0x6d, 0x6c, + 0xfc, 0x14, 0x20, 0x4b, 0x26, 0xc9, 0x61, 0xff, 0x8b, 0xfd, 0xa7, 0xf6, 0x1c, 0x6a, 0xc2, 0xc2, + 0xf3, 0xed, 0xee, 0x51, 0x77, 0xff, 0x33, 0xdb, 0x50, 0x1b, 0x12, 0x6f, 0x4c, 0x89, 0xb3, 0x2b, + 0x71, 0xac, 0x8d, 0x1f, 0x94, 0x1a, 0x08, 0x5a, 0x00, 0x6b, 0x7b, 0x38, 0xb4, 0xe7, 0x50, 0x1d, + 0xcc, 0xdd, 0x1d, 0xdb, 0x90, 0x92, 0xf6, 0x83, 0x68, 0x44, 0x87, 0xb6, 0xb9, 0xd1, 0x85, 0x95, + 0x62, 0x40, 0x2b, 0xb6, 0x41, 0x74, 0xea, 0xf9, 0x83, 0x58, 0xe0, 0xa1, 0x50, 0x55, 0x2a, 0x16, + 0x18, 0x6b, 0xe8, 0xda, 0x26, 0xb2, 0x61, 0xa9, 0xeb, 0x7b, 0xc2, 0xa3, 0x43, 0xef, 0xb5, 0xc4, + 0xb5, 0x76, 0x7e, 0xfe, 0xb7, 0x37, 0x6d, 0xe3, 0x9b, 0x37, 0x6d, 0xe3, 0xdf, 0x6f, 0xda, 0xc6, + 0x1f, 0xde, 0xb6, 0xe7, 0xbe, 0x79, 0xdb, 0x9e, 0xfb, 0xe7, 0xdb, 0xf6, 0xdc, 0x6f, 0xbe, 0x3b, + 0xf0, 0xc4, 0xc9, 0xb8, 0xbf, 0xe9, 0x04, 0xa3, 0x47, 0xa1, 0xe7, 0x0f, 0x1c, 0x1a, 0x3e, 0x12, + 0x9e, 0xe3, 0x3a, 0x8f, 0x72, 0x51, 0xd6, 0xaf, 0xab, 0xff, 0x15, 0x3e, 0xfa, 0x7f, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xba, 0xd1, 0xbd, 0x4a, 0x76, 0x18, 0x00, 0x00, } func (m *TableSpan) Marshal() (dAtA []byte, err error) { diff --git a/heartbeatpb/heartbeat.proto b/heartbeatpb/heartbeat.proto index db926732e..9675b8081 100644 --- a/heartbeatpb/heartbeat.proto +++ b/heartbeatpb/heartbeat.proto @@ -250,6 +250,7 @@ enum ComponentState { Working = 0; Stopped = 1; Removed = 2; + Initializing = 3; } message RunningError { diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index bd17680f5..9cbabd8e3 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -181,8 +181,8 @@ func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) { } tableSpans := []*heartbeatpb.TableSpan{tableSpan} if c.enableTableAcrossNodes { - // split the whole table span base on the configuration, todo: background split table - tableSpans = c.splitter.SplitSpans(context.Background(), tableSpan, len(c.nodeManager.GetAliveNodes())) + // split the whole table span base on region count if table region count is exceed the limit + tableSpans = c.splitter.SplitSpansByRegion(context.Background(), tableSpan, len(c.nodeManager.GetAliveNodes())) } c.addNewSpans(table.SchemaID, tableSpans, startTs) } diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index e4cd2f656..e33ac7c77 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -57,7 +57,7 @@ func NewSpanReplication(cfID common.ChangeFeedID, span *heartbeatpb.TableSpan, checkpointTs uint64, ) *SpanReplication { - r := newSpanReplication(cfID, id, pdClock, SchemaID, span, checkpointTs) + r := newSpanReplication(cfID, id, pdClock, SchemaID, span) r.initStatus(&heartbeatpb.TableSpanStatus{ ID: id.ToPB(), CheckpointTs: checkpointTs, @@ -82,7 +82,7 @@ func NewWorkingSpanReplication( status *heartbeatpb.TableSpanStatus, nodeID node.ID, ) *SpanReplication { - r := newSpanReplication(cfID, id, pdClock, SchemaID, span, status.CheckpointTs) + r := newSpanReplication(cfID, id, pdClock, SchemaID, span) // Must set Node ID when creating a working span replication r.SetNodeID(nodeID) r.initStatus(status) @@ -100,7 +100,7 @@ func NewWorkingSpanReplication( return r } -func newSpanReplication(cfID common.ChangeFeedID, id common.DispatcherID, pdClock pdutil.Clock, SchemaID int64, span *heartbeatpb.TableSpan, checkpointTs uint64) *SpanReplication { +func newSpanReplication(cfID common.ChangeFeedID, id common.DispatcherID, pdClock pdutil.Clock, SchemaID int64, span *heartbeatpb.TableSpan) *SpanReplication { r := &SpanReplication{ ID: id, pdClock: pdClock, diff --git a/maintainer/split/region_count_splitter.go b/maintainer/split/region_count_splitter.go index bb2e60240..03025a40e 100644 --- a/maintainer/split/region_count_splitter.go +++ b/maintainer/split/region_count_splitter.go @@ -16,7 +16,6 @@ package split import ( "bytes" "context" - "math" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" @@ -25,10 +24,14 @@ import ( "go.uber.org/zap" ) +// regionCountSplitter is a splitter that splits spans by region count. +// It is used to split spans when add new table when initialize the maintainer and enable enableTableAcrossNodesS +// regionCountSplitter will split a table span into multiple spans, each span contains at most k regions. type regionCountSplitter struct { - changefeedID common.ChangeFeedID - regionCache RegionCache - regionThreshold int + changefeedID common.ChangeFeedID + regionCache RegionCache + regionThreshold int + regionCountPerSpan int // the max number of regions in each span, which is set by configuration } func newRegionCountSplitter( @@ -63,9 +66,7 @@ func (m *regionCountSplitter) split( return []*heartbeatpb.TableSpan{span} } - stepper := newEvenlySplitStepper( - getSpansNumber(len(regions), captureNum), - len(regions)) + stepper := newEvenlySplitStepper(len(regions), m.regionCountPerSpan) spans := make([]*heartbeatpb.TableSpan, 0, stepper.SpanCount()) start, end := 0, stepper.Step() @@ -110,10 +111,11 @@ func (m *regionCountSplitter) split( } start = end step := stepper.Step() - if end+step < len(regions) { + // should not happen + if end+step <= len(regions) { end = end + step } else { - end = len(regions) + log.Panic("Unexpected stepper step", zap.Any("end", end), zap.Any("step", step), zap.Any("len of regions", len(regions))) } } // Make sure spans does not exceed [startKey, endKey). @@ -131,31 +133,26 @@ func (m *regionCountSplitter) split( } type evenlySplitStepper struct { - spanCount int - regionPerSpan int - extraRegionPerSpan int - remain int + spanCount int + regionPerSpan int + remain int // the number of spans that have the regionPerSpan + 1 region count } -func newEvenlySplitStepper(pages int, totalRegion int) evenlySplitStepper { - extraRegionPerSpan := 0 - regionPerSpan, remain := totalRegion/pages, totalRegion%pages - if regionPerSpan == 0 { - regionPerSpan = 1 - extraRegionPerSpan = 0 - pages = totalRegion - } else if remain != 0 { - // Evenly distributes the remaining regions. - extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(pages))) +func newEvenlySplitStepper(totalRegion int, maxRegionPerSpan int) evenlySplitStepper { + if totalRegion%maxRegionPerSpan == 0 { + return evenlySplitStepper{ + regionPerSpan: maxRegionPerSpan, + spanCount: totalRegion / maxRegionPerSpan, + remain: 0, + } } - res := evenlySplitStepper{ - regionPerSpan: regionPerSpan, - spanCount: pages, - extraRegionPerSpan: extraRegionPerSpan, - remain: remain, + spanCount := totalRegion/maxRegionPerSpan + 1 + regionPerSpan := totalRegion / spanCount + return evenlySplitStepper{ + regionPerSpan: regionPerSpan, + spanCount: spanCount, + remain: totalRegion - regionPerSpan*spanCount, } - log.Info("evenly split stepper", zap.Any("regionPerSpan", regionPerSpan), zap.Any("spanCount", pages), zap.Any("extraRegionPerSpan", extraRegionPerSpan)) - return res } func (e *evenlySplitStepper) SpanCount() int { @@ -166,6 +163,6 @@ func (e *evenlySplitStepper) Step() int { if e.remain <= 0 { return e.regionPerSpan } - e.remain = e.remain - e.extraRegionPerSpan - return e.regionPerSpan + e.extraRegionPerSpan + e.remain = e.remain - 1 + return e.regionPerSpan + 1 } diff --git a/maintainer/split/splitter.go b/maintainer/split/splitter.go index 81ed062eb..8ed989f3a 100644 --- a/maintainer/split/splitter.go +++ b/maintainer/split/splitter.go @@ -58,7 +58,9 @@ type splitter interface { } type Splitter struct { - splitters []splitter + regionCounterSplitter *regionCountSplitter + writeKeySplitter *writeSplitter + //splitters []splitter changefeedID common.ChangeFeedID } @@ -75,25 +77,32 @@ func NewSplitter( } log.Info("baseSpanNumberCoefficient", zap.Any("ChangefeedID", changefeedID.Name()), zap.Any("baseSpanNumberCoefficient", baseSpanNumberCoefficient)) return &Splitter{ - changefeedID: changefeedID, - splitters: []splitter{ - // write splitter has the highest priority. - newWriteSplitter(changefeedID, pdapi, config.WriteKeyThreshold), - newRegionCountSplitter(changefeedID, regionCache, config.RegionThreshold), - }, + changefeedID: changefeedID, + regionCounterSplitter: newRegionCountSplitter(changefeedID, regionCache, config.RegionThreshold), + writeKeySplitter: newWriteSplitter(changefeedID, pdapi, config.WriteKeyThreshold), } } -func (s *Splitter) SplitSpans(ctx context.Context, +func (s *Splitter) SplitSpansByRegion(ctx context.Context, span *heartbeatpb.TableSpan, totalCaptures int, ) []*heartbeatpb.TableSpan { spans := []*heartbeatpb.TableSpan{span} - for _, sp := range s.splitters { - spans = sp.split(ctx, span, totalCaptures) - if len(spans) > 1 { - return spans - } + spans = s.regionCounterSplitter.split(ctx, span, totalCaptures) + if len(spans) > 1 { + return spans + } + return spans +} + +func (s *Splitter) SplitSpansByWriteKey(ctx context.Context, + span *heartbeatpb.TableSpan, + totalCaptures int, +) []*heartbeatpb.TableSpan { + spans := []*heartbeatpb.TableSpan{span} + spans = s.writeKeySplitter.split(ctx, span, totalCaptures) + if len(spans) > 1 { + return spans } return spans } diff --git a/pkg/scheduler/balance.go b/pkg/scheduler/balance.go index d2e3bc23f..3471c9f67 100644 --- a/pkg/scheduler/balance.go +++ b/pkg/scheduler/balance.go @@ -80,6 +80,7 @@ func (s *balanceScheduler[T, S, R]) Execute() time.Time { return now.Add(s.checkBalanceInterval) }) + // TODO: 这个后续要优化掉,拆表的 basic 调度不应该影响普通表的调度,但不着急先不改。 if s.operatorController.OperatorSize() > 0 || s.db.GetAbsentSize() > 0 { // not in stable schedule state, skip balance return now.Add(s.checkBalanceInterval) @@ -100,6 +101,10 @@ func (s *balanceScheduler[T, S, R]) Execute() time.Time { func (s *balanceScheduler[T, S, R]) schedulerGroup(nodes map[node.ID]*node.Info) int { availableSize, totalMoved := s.batchSize, 0 for _, group := range s.db.GetGroups() { + // 先不调度任何拆表的组 + if group == replica.DefaultGroupID { + continue + } // fast path, check the balance status moveSize := CheckBalanceStatus(s.db.GetTaskSizePerNodeByGroup(group), nodes) if moveSize <= 0 { @@ -135,7 +140,11 @@ func (s *balanceScheduler[T, S, R]) schedulerGlobal(nodes map[node.ID]*node.Info // complexity note: len(nodes) * len(groups) totalTasks := 0 sizePerNode := make(map[node.ID]int, len(nodes)) - for _, nodeTasks := range groupNodetasks { + for groupID, nodeTasks := range groupNodetasks { + // TODO: not balance the split table now. + if groupID != replica.DefaultGroupID { + continue + } for id, task := range nodeTasks { if task != zero { totalTasks++ @@ -153,7 +162,10 @@ func (s *balanceScheduler[T, S, R]) schedulerGlobal(nodes map[node.ID]*node.Info } moved := 0 - for _, nodeTasks := range groupNodetasks { + for groupID, nodeTasks := range groupNodetasks { + if groupID != replica.DefaultGroupID { + continue + } availableNodes, victims, nextVictim := []node.ID{}, []node.ID{}, 0 for id, task := range nodeTasks { if task != zero && sizePerNode[id] > lowerLimitPerNode { diff --git a/pkg/scheduler/basic.go b/pkg/scheduler/basic.go index 0eaada12b..24f72d54c 100644 --- a/pkg/scheduler/basic.go +++ b/pkg/scheduler/basic.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/ticdc/utils/heap" ) +const schedulingTaskCountPerNode = 6 + // basicScheduler generates operators for the spans, and push them to the operator controller // it generates add operator for the absent spans, and move operator for the unbalanced replicating spans // currently, it only supports balance the spans by size @@ -58,9 +60,14 @@ func NewBasicScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R r } // Execute periodically execute the operator +// 区分 split table 和 非 split table func (s *basicScheduler[T, S, R]) Execute() time.Time { availableSize := s.batchSize - s.operatorController.OperatorSize() - if s.db.GetAbsentSize() <= 0 || availableSize <= 0 { + + totalAbsentSize := s.db.GetAbsentSize() + defaultGroupAbsentSize := s.db.GetAbsentSizeForGroup(replica.DefaultGroupID) + + if totalAbsentSize <= 0 || availableSize <= 0 { // can not schedule more operators, skip return time.Now().Add(time.Millisecond * 500) } @@ -69,31 +76,114 @@ func (s *basicScheduler[T, S, R]) Execute() time.Time { return time.Now().Add(time.Millisecond * 100) } - for _, id := range s.db.GetGroups() { - availableSize -= s.schedule(id, availableSize) - if availableSize <= 0 { - break + // 分配一下 availableSize 的使用, 首先 non-default 的总开销不能大于 batchSize / 2,避免把常规表饿死 + // 每个节点对一个 split 表的空间设为 k(比如10个),有空余了才会花掉一点。 + // 可以先做个暴力的分发,一人一半,如果 前面的没这么多需要用的,后面最大额度也就是 batchSize / 2 + + if defaultGroupAbsentSize == totalAbsentSize { + // only have absent replicas in the default group + s.schedule(replica.DefaultGroupID, availableSize) + } else if defaultGroupAbsentSize == 0 { + // only have absent replicas in non-default group + maxAvailableSize := max(availableSize, s.batchSize/2) + for _, id := range s.db.GetGroups() { + if id == replica.DefaultGroupID { + continue + } + maxAvailableSize -= s.schedule(id, maxAvailableSize) + if maxAvailableSize <= 0 { + break + } + } + } else { + availableSizeForNonDefault := min(min(max(availableSize/2, availableSize-defaultGroupAbsentSize), totalAbsentSize-defaultGroupAbsentSize), s.batchSize/2) + availableSizeForDefault := min(defaultGroupAbsentSize, availableSize-defaultGroupAbsentSize) + for _, id := range s.db.GetGroups() { + if id == replica.DefaultGroupID { + s.schedule(id, availableSizeForDefault) + } + availableSizeForNonDefault -= s.schedule(id, availableSize) + if availableSizeForNonDefault <= 0 { + break + } } } return time.Now().Add(time.Millisecond * 500) } func (s *basicScheduler[T, S, R]) schedule(id replica.GroupID, availableSize int) (scheduled int) { + if id == replica.DefaultGroupID { + absent := s.db.GetAbsentByGroup(id, availableSize) + // 如果是非 default 组,看一下每个节点有几个 absent 任务, k - absent 任务就是能安排的量 + nodeSize := s.db.GetTaskSizePerNodeByGroup(id) + // add the absent node to the node size map + for id := range s.nodeManager.GetAliveNodes() { + if _, ok := nodeSize[id]; !ok { + nodeSize[id] = 0 + } + } + // what happens if the some node removed when scheduling? + BasicSchedule(availableSize, absent, nodeSize, func(replication R, id node.ID) bool { + op := s.newAddOperator(replication, id) + return s.operatorController.AddOperator(op) + }) + scheduled = len(absent) + s.absent = absent[:0] + return + } + scheduled = 0 absent := s.db.GetAbsentByGroup(id, availableSize) - nodeSize := s.db.GetTaskSizePerNodeByGroup(id) - // add the absent node to the node size map + nodeSize := s.db.GetScheduleTaskSizePerNodeByGroup(id) + spaceCount := 0 for id := range s.nodeManager.GetAliveNodes() { if _, ok := nodeSize[id]; !ok { nodeSize[id] = 0 } + spaceCount += schedulingTaskCountPerNode - nodeSize[id] + } + + if spaceCount <= len(absent) { + // 给每个node 安排 schedulingTaskCountPerNode - nodeSize[id] + for id, size := range nodeSize { + taskList := absent[:schedulingTaskCountPerNode-size] + count := 0 + for idx, task := range taskList { + count = idx + op := s.newAddOperator(task, id) + if !s.operatorController.AddOperator(op) { + count -= 1 + break + } else { + scheduled += 1 + } + } + absent = absent[count+1:] + } + } else { + // 那就尽可能给每个节点均匀一点 + updated := false + for len(absent) != 0 { + for id, size := range nodeSize { + if len(absent) == 0 { + return + } + if size < schedulingTaskCountPerNode { + op := s.newAddOperator(absent[0], id) + if !s.operatorController.AddOperator(op) { + continue + } else { + absent = absent[1:] + nodeSize[id] += 1 + updated = true + scheduled += 1 + } + } + } + if !updated { + break + } + } } - // what happens if the some node removed when scheduling? - BasicSchedule(availableSize, absent, nodeSize, func(replication R, id node.ID) bool { - op := s.newAddOperator(replication, id) - return s.operatorController.AddOperator(op) - }) - scheduled = len(absent) - s.absent = absent[:0] return } diff --git a/pkg/scheduler/replica/replication.go b/pkg/scheduler/replica/replication.go index 1e9f92066..09668bb78 100644 --- a/pkg/scheduler/replica/replication.go +++ b/pkg/scheduler/replica/replication.go @@ -49,6 +49,7 @@ type Replication[T ReplicationID] interface { type ScheduleGroup[T ReplicationID, R Replication[T]] interface { GetAbsentSize() int GetAbsent() []R + GetAbsentSizeForGroup(groupID GroupID) int GetSchedulingSize() int GetScheduling() []R GetReplicatingSize() int @@ -67,6 +68,7 @@ type ScheduleGroup[T ReplicationID, R Replication[T]] interface { GetTaskSizePerNode() map[node.ID]int GetImbalanceGroupNodeTask(nodes map[node.ID]*node.Info) (groups map[GroupID]map[node.ID]R, valid bool) GetTaskSizePerNodeByGroup(groupID GroupID) map[node.ID]int + GetScheduleTaskSizePerNodeByGroup(groupID GroupID) map[node.ID]int GetGroupChecker(groupID GroupID) GroupChecker[T, R] GetCheckerStat() string @@ -158,6 +160,15 @@ func (db *replicationDB[T, R]) GetAbsentSize() int { return size } +func (db *replicationDB[T, R]) GetAbsentSizeForGroup(groupID GroupID) int { + size := 0 + db.withRLock(func() { + g := db.mustGetGroup(groupID) + size = g.GetAbsentSize() + }) + return size +} + func (db *replicationDB[T, R]) GetAbsentByGroup(id GroupID, batch int) []R { buffer := make([]R, 0, batch) db.withRLock(func() { @@ -332,6 +343,28 @@ func (db *replicationDB[T, R]) GetTaskSizeByNodeID(id node.ID) (size int) { return } +func (db *replicationDB[T, R]) GetScheduleTaskSizePerNodeByGroup(id GroupID) (sizeMap map[node.ID]int) { + db.withRLock(func() { + sizeMap = db.getScheduleTaskSizePerNodeByGroup(id) + }) + return +} + +func (db *replicationDB[T, R]) getScheduleTaskSizePerNodeByGroup(id GroupID) (sizeMap map[node.ID]int) { + sizeMap = make(map[node.ID]int) + replicationGroup := db.mustGetGroup(id) + for nodeID, tasks := range replicationGroup.GetNodeTasks() { + count := 0 + for taskID, _ := range tasks { + if replicationGroup.scheduling.Find(taskID) { + count++ + } + } + sizeMap[nodeID] = count + } + return +} + func (db *replicationDB[T, R]) GetTaskSizePerNodeByGroup(id GroupID) (sizeMap map[node.ID]int) { db.withRLock(func() { sizeMap = db.getTaskSizePerNodeByGroup(id) diff --git a/pkg/scheduler/replica/replication_group.go b/pkg/scheduler/replica/replication_group.go index 819e9e757..987ec3a01 100644 --- a/pkg/scheduler/replica/replication_group.go +++ b/pkg/scheduler/replica/replication_group.go @@ -253,6 +253,11 @@ func newIMap[T ReplicationID, R Replication[T]]() *iMap[T, R] { return &iMap[T, R]{inner: sync.Map{}} } +func (m *iMap[T, R]) Find(key T) bool { + _, exists := m.inner.Load(key) + return exists +} + func (m *iMap[T, R]) Get(key T) (R, bool) { var value R v, exists := m.inner.Load(key) From e961287ebd189f2d12d4b4b021f9396e7504f649 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 2 Apr 2025 14:13:35 +0800 Subject: [PATCH 2/5] update --- .../dispatchermanager/event_dispatcher_manager.go | 10 +--------- maintainer/scheduler.go | 2 +- maintainer/split/region_count_splitter.go | 2 +- pkg/scheduler/basic.go | 2 ++ 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 877aefb64..0935cd1ac 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -526,14 +526,6 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo, re seq := e.dispatcherMap.Set(id, d) d.SetSeq(seq) - // e.statusesChan <- TableSpanStatusWithSeq{ - // TableSpanStatus: &heartbeatpb.TableSpanStatus{ - // ID: id.ToPB(), - // ComponentStatus: heartbeatpb.ComponentState_Working, - // }, - // StartTs: uint64(newStartTsList[idx]), - // Seq: seq, - // } if d.IsTableTriggerEventDispatcher() { e.metricTableTriggerEventDispatcherCount.Inc() @@ -775,7 +767,7 @@ func (e *EventDispatcherManager) removeDispatcher(id common.DispatcherID) { if dispatcherItem.GetRemovingStatus() { return } - appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcher) + appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).RemoveDispatcher(dispatcherItem) // for non-mysql class sink, only the event dispatcher manager with table trigger event dispatcher need to receive the checkpointTs message. if dispatcherItem.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType { diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index d2ae02b80..13d65fb82 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -162,7 +162,7 @@ func (s *splitScheduler) doCheck(ret pkgReplica.GroupCheckResult, start time.Tim case replica.OpMergeAndSplit: log.Info("Into OP MergeAndSplit") // expectedSpanNum := split.NextExpectedSpansNumber(len(ret.Replications)) - spans := s.splitter.SplitSpans(context.Background(), totalSpan, len(s.nodeManager.GetAliveNodes())) + spans := s.splitter.SplitSpansByWriteKey(context.Background(), totalSpan, len(s.nodeManager.GetAliveNodes())) if len(spans) > 1 { log.Info("split span", zap.String("changefeed", s.changefeedID.Name()), diff --git a/maintainer/split/region_count_splitter.go b/maintainer/split/region_count_splitter.go index 03025a40e..a056b99fe 100644 --- a/maintainer/split/region_count_splitter.go +++ b/maintainer/split/region_count_splitter.go @@ -111,10 +111,10 @@ func (m *regionCountSplitter) split( } start = end step := stepper.Step() - // should not happen if end+step <= len(regions) { end = end + step } else { + // should not happen log.Panic("Unexpected stepper step", zap.Any("end", end), zap.Any("step", step), zap.Any("len of regions", len(regions))) } } diff --git a/pkg/scheduler/basic.go b/pkg/scheduler/basic.go index 24f72d54c..75079096f 100644 --- a/pkg/scheduler/basic.go +++ b/pkg/scheduler/basic.go @@ -113,6 +113,7 @@ func (s *basicScheduler[T, S, R]) Execute() time.Time { func (s *basicScheduler[T, S, R]) schedule(id replica.GroupID, availableSize int) (scheduled int) { if id == replica.DefaultGroupID { + // default 的话,就按照目标是每个节点总的 dispatcher 数目量一致来处理 absent := s.db.GetAbsentByGroup(id, availableSize) // 如果是非 default 组,看一下每个节点有几个 absent 任务, k - absent 任务就是能安排的量 nodeSize := s.db.GetTaskSizePerNodeByGroup(id) @@ -131,6 +132,7 @@ func (s *basicScheduler[T, S, R]) schedule(id replica.GroupID, availableSize int s.absent = absent[:0] return } + // 非 default 的话,就按照目标是每个节点总的 scheding 数目维持一致来处理 scheduled = 0 absent := s.db.GetAbsentByGroup(id, availableSize) nodeSize := s.db.GetScheduleTaskSizePerNodeByGroup(id) From a464b8833b9d94e374a53945a42a5a689a64b33a Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 2 Apr 2025 17:31:28 +0800 Subject: [PATCH 3/5] update --- maintainer/split/region_count_splitter.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/maintainer/split/region_count_splitter.go b/maintainer/split/region_count_splitter.go index a056b99fe..ffdf8e174 100644 --- a/maintainer/split/region_count_splitter.go +++ b/maintainer/split/region_count_splitter.go @@ -38,9 +38,10 @@ func newRegionCountSplitter( changefeedID common.ChangeFeedID, regionCache RegionCache, regionThreshold int, ) *regionCountSplitter { return ®ionCountSplitter{ - changefeedID: changefeedID, - regionCache: regionCache, - regionThreshold: regionThreshold, + changefeedID: changefeedID, + regionCache: regionCache, + regionThreshold: regionThreshold, + regionCountPerSpan: 100, } } From 65cf9bf0582df165742222512ac74a58cbcc3e33 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 3 Apr 2025 10:19:54 +0800 Subject: [PATCH 4/5] update --- pkg/scheduler/basic.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/basic.go b/pkg/scheduler/basic.go index 75079096f..4a5d2c48d 100644 --- a/pkg/scheduler/basic.go +++ b/pkg/scheduler/basic.go @@ -147,19 +147,21 @@ func (s *basicScheduler[T, S, R]) schedule(id replica.GroupID, availableSize int if spaceCount <= len(absent) { // 给每个node 安排 schedulingTaskCountPerNode - nodeSize[id] for id, size := range nodeSize { - taskList := absent[:schedulingTaskCountPerNode-size] - count := 0 - for idx, task := range taskList { - count = idx - op := s.newAddOperator(task, id) - if !s.operatorController.AddOperator(op) { - count -= 1 - break - } else { - scheduled += 1 + if schedulingTaskCountPerNode-size > 0 { + taskList := absent[:schedulingTaskCountPerNode-size] + count := 0 + for idx, task := range taskList { + count = idx + op := s.newAddOperator(task, id) + if !s.operatorController.AddOperator(op) { + count -= 1 + break + } else { + scheduled += 1 + } } + absent = absent[count+1:] } - absent = absent[count+1:] } } else { // 那就尽可能给每个节点均匀一点 From c3bd09b5658783972f47c3a5b1846c287b93d466 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 3 Apr 2025 10:58:56 +0800 Subject: [PATCH 5/5] update --- pkg/scheduler/basic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/basic.go b/pkg/scheduler/basic.go index 4a5d2c48d..65c8f8b08 100644 --- a/pkg/scheduler/basic.go +++ b/pkg/scheduler/basic.go @@ -60,7 +60,7 @@ func NewBasicScheduler[T replica.ReplicationID, S replica.ReplicationStatus, R r } // Execute periodically execute the operator -// 区分 split table 和 非 split table +// 把 maintainer 的调度和 dispatcher的分开 func (s *basicScheduler[T, S, R]) Execute() time.Time { availableSize := s.batchSize - s.operatorController.OperatorSize()