Skip to content

maintainer: Add checker tests and remove stale codes #731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,6 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat
if response.Success {
m.nodesClosed[from] = struct{}{}
}
// check if all nodes have sent response
m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved)
}

func (m *Maintainer) handleResendMessage() {
Expand Down
53 changes: 30 additions & 23 deletions maintainer/maintainer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func TestDynamicSplitTableBasic(t *testing.T) {
require.Equal(t, 2, s.replicationDB.GetReplicatingSize())

for _, task := range replicas {
for cnt := 0; cnt < replica.HotSpanScoreThreshold; cnt++ {
for cnt := 0; cnt < replica.DefaultHotSpanScoreThreshold; cnt++ {
s.replicationDB.UpdateStatus(task, &heartbeatpb.TableSpanStatus{
ID: task.ID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
Expand Down Expand Up @@ -674,12 +674,7 @@ func TestDynamicSplitTableBasic(t *testing.T) {
require.Equal(t, 7, s.replicationDB.GetAbsentSize())
}

func TestDynamiSplitTableWhenScaleOut(t *testing.T) {
t.Skip("skip unimplemented test")
}

func TestDynamicMergeAndSplitTable(t *testing.T) {
t.Skip("skip flaky test")
func TestDynamicRebalanceTable(t *testing.T) {
pdAPI := &mockPdAPI{
regions: make(map[int64][]pdutil.RegionInfo),
}
Expand Down Expand Up @@ -710,6 +705,7 @@ func TestDynamicMergeAndSplitTable(t *testing.T) {
for i := 1; i <= totalTables; i++ {
totalSpan := spanz.TableIDToComparableSpan(int64(i))
partialSpans := []*heartbeatpb.TableSpan{
// total tasks < total nodes * replica.MinSpanNumberCoefficient
{TableID: int64(i), StartKey: totalSpan.StartKey, EndKey: appendNew(totalSpan.StartKey, 'a')},
{TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')},
{TableID: int64(i), StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: totalSpan.EndKey},
Expand All @@ -730,44 +726,55 @@ func TestDynamicMergeAndSplitTable(t *testing.T) {
EventSizePerSecond: replica.HotSpanWriteThreshold,
}, node.ID(fmt.Sprintf("node%d", idx%2+1)))
if idx == 0 {
spanReplica.GetStatus().EventSizePerSecond = replica.HotSpanWriteThreshold * 100
spanReplica.GetStatus().EventSizePerSecond = replica.HotSpanWriteThreshold * 2
}
s.replicationDB.AddReplicatingSpan(spanReplica)
}

// new split regions
pdAPI.regions[1] = []pdutil.RegionInfo{
pdAPI.regions[int64(i)] = []pdutil.RegionInfo{
pdutil.NewTestRegionInfo(1, totalSpan.StartKey, appendNew(totalSpan.StartKey, 'a'), uint64(1)),
pdutil.NewTestRegionInfo(2, appendNew(totalSpan.StartKey, 'a'), totalSpan.EndKey, uint64(1)),
pdutil.NewTestRegionInfo(2, appendNew(totalSpan.StartKey, 'a'), appendNew(totalSpan.StartKey, 'b'), uint64(1)),
pdutil.NewTestRegionInfo(2, appendNew(totalSpan.StartKey, 'b'), appendNew(totalSpan.StartKey, 'c'), uint64(1)),
pdutil.NewTestRegionInfo(2, appendNew(totalSpan.StartKey, 'c'), totalSpan.EndKey, uint64(1)),
}
}
expected := (totalTables - 1) * 3
replicas := s.replicationDB.GetReplicating()
require.Equal(t, totalTables*3-1, s.replicationDB.GetReplicatingSize())
require.Equal(t, expected+2, s.replicationDB.GetReplicatingSize())

scheduler := s.schedulerController.GetScheduler(scheduler.SplitScheduler)
scheduler.Execute()
require.Equal(t, 0, s.replicationDB.GetSchedulingSize())
require.Equal(t, totalTables*3-1, s.operatorController.OperatorSize())
finishedCnt := 0
require.Equal(t, 2, s.replicationDB.GetReplicatingSize())
require.Equal(t, expected, s.replicationDB.GetSchedulingSize())
require.Equal(t, expected, s.operatorController.OperatorSize())

primarys := make(map[int64]pkgOpearator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus])
for _, task := range replicas {
op := s.operatorController.GetOperator(task.ID)
if op == nil {
require.Equal(t, int64(victim), task.Span.GetTableID())
continue
}
op.Schedule()
op.Check("node1", &heartbeatpb.TableSpanStatus{
op.Check(task.GetNodeID(), &heartbeatpb.TableSpanStatus{
ID: op.ID().ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Stopped,
CheckpointTs: 10,
})
if op.IsFinished() {
op.PostFinish()
finishedCnt++
} else {
primarys[task.Span.GetTableID()] = op
}
}
require.Less(t, finishedCnt, totalTables*3-1)
for _, op := range primarys {
finished := op.IsFinished()
require.True(t, finished)
op.PostFinish()
}

//total 7 regions,
// table 1: split to 4 spans, will be inserted to absent
// table 2: split to 3 spans, will be inserted to absent
require.Equal(t, 7, s.replicationDB.GetAbsentSize())
require.Equal(t, (totalTables-1)*4, s.replicationDB.GetAbsentSize())
}

func TestDynamicMergeTableBasic(t *testing.T) {
Expand Down Expand Up @@ -832,7 +839,7 @@ func TestDynamicMergeTableBasic(t *testing.T) {
require.Equal(t, expected+victimExpected, s.replicationDB.GetReplicatingSize())

scheduler := s.schedulerController.GetScheduler(scheduler.SplitScheduler)
for i := 0; i < replica.DefaultScoreThreshold; i++ {
for i := 0; i < replica.DefaultImbalanceScoreThreshold; i++ {
scheduler.Execute()
}
scheduler.Execute() // dummy execute does not take effect
Expand Down Expand Up @@ -879,7 +886,7 @@ func TestDynamicMergeTableBasic(t *testing.T) {
s.replicationDB.AddReplicatingSpan(spanReplica)
replicas = s.replicationDB.GetReplicating()
require.Equal(t, 3, len(replicas))
for i := 0; i < replica.DefaultScoreThreshold; i++ {
for i := 0; i < replica.DefaultImbalanceScoreThreshold; i++ {
scheduler.Execute()
}
require.Equal(t, 0, s.replicationDB.GetReplicatingSize())
Expand Down
6 changes: 0 additions & 6 deletions maintainer/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,6 @@ func (oc *Controller) NewRemoveOperator(replicaSet *replica.SpanReplication) ope
}
}

func (oc *Controller) NewSplitOperator(
replicaSet *replica.SpanReplication, originNode node.ID, splitSpans []*heartbeatpb.TableSpan,
) operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus] {
return NewSplitDispatcherOperator(oc.replicationDB, replicaSet, originNode, splitSpans)
}

// AddMergeSplitOperator adds a merge split operator to the controller.
// 1. Merge Operator: len(affectedReplicaSets) > 1, len(splitSpans) == 1
// 2. Split Operator: len(affectedReplicaSets) == 1, len(splitSpans) > 1
Expand Down
141 changes: 0 additions & 141 deletions maintainer/operator/operator_split.go

This file was deleted.

35 changes: 15 additions & 20 deletions maintainer/replica/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ const (

const (
HotSpanWriteThreshold = 1024 * 1024 // 1MB per second
HotSpanScoreThreshold = 3 // TODO: bump to 10 befroe release
DefaultScoreThreshold = 3

defaultHardImbalanceThreshold = float64(1.35) // used to trigger the rebalance
clearTimeout = 300 // seconds
DefaultHotSpanScoreThreshold = 10 // 10 * status report interval = 100s
DefaultImbalanceScoreThreshold = 3 // 3 * check interval = 180s

defaultImbalanceThreshold = float64(1.35) // used to trigger the rebalance
clearTimeout = 300 // seconds
)

var MinSpanNumberCoefficient = 2
Expand Down Expand Up @@ -100,7 +101,7 @@ func newHotSpanChecker(cfID common.ChangeFeedID) *hotSpanChecker {
changefeedID: cfID,
hotTasks: make(map[common.DispatcherID]*hotSpanStatus),
writeThreshold: HotSpanWriteThreshold,
scoreThreshold: HotSpanScoreThreshold,
scoreThreshold: DefaultHotSpanScoreThreshold,
}
}

Expand Down Expand Up @@ -231,16 +232,17 @@ type rebalanceChecker struct {
func newImbalanceChecker(cfID common.ChangeFeedID) *rebalanceChecker {
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
return &rebalanceChecker{
changefeedID: cfID,
allTasks: make(map[common.DispatcherID]*hotSpanStatus),
nodeManager: nodeManager,
hardWriteThreshold: 10 * HotSpanWriteThreshold,
hardImbalanceThreshold: defaultHardImbalanceThreshold,
changefeedID: cfID,
allTasks: make(map[common.DispatcherID]*hotSpanStatus),
nodeManager: nodeManager,
// FIXME: maybe remove hard check since it could be tiggered unexpectedly in scale out cases.
hardWriteThreshold: 64 * HotSpanWriteThreshold,
hardImbalanceThreshold: 2 * defaultImbalanceThreshold,

softWriteThreshold: 3 * HotSpanWriteThreshold,
softImbalanceThreshold: 1.2, // 2 * defaultHardImbalanceThreshold,
softRebalanceScoreThreshold: DefaultScoreThreshold,
softMergeScoreThreshold: DefaultScoreThreshold,
softImbalanceThreshold: defaultImbalanceThreshold,
softRebalanceScoreThreshold: DefaultImbalanceScoreThreshold,
softMergeScoreThreshold: DefaultImbalanceScoreThreshold,
}
}

Expand Down Expand Up @@ -364,10 +366,3 @@ func (s *rebalanceChecker) Stat() string {
res.WriteString(fmt.Sprintf("softScore: [rebalance: %d, merge: %d]", s.softRebalanceScore, s.softMergeScore))
return res.String()
}

// TODO: implement the dynamic merge and split checker
type dynamicMergeSplitChecker struct {
changefeedID common.ChangeFeedID
allTasks map[common.DispatcherID]*hotSpanStatus
nodeManager *watcher.NodeManager
}
2 changes: 1 addition & 1 deletion maintainer/replica/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestHotSpanChecker(t *testing.T) {
EventSizePerSecond: HotSpanWriteThreshold,
})
require.Equal(t, 1, len(checker.hotTasks))
for i := 0; i < HotSpanScoreThreshold; i++ {
for i := 0; i < DefaultHotSpanScoreThreshold; i++ {
db.UpdateStatus(replicaSpan, &heartbeatpb.TableSpanStatus{
CheckpointTs: 9,
EventSizePerSecond: HotSpanWriteThreshold,
Expand Down
Loading