Skip to content

Commit d31429c

Browse files
committed
Refactor the kv client.
1 parent 6a3a176 commit d31429c

File tree

4 files changed

+64
-76
lines changed

4 files changed

+64
-76
lines changed

api/http.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
package api
1515

1616
import (
17+
"net/http/pprof"
18+
1719
"github.com/gin-gonic/gin"
1820
v1 "github.com/pingcap/ticdc/api/v1"
1921
v2 "github.com/pingcap/ticdc/api/v2"
2022
"github.com/pingcap/ticdc/pkg/server"
2123
"github.com/prometheus/client_golang/prometheus"
2224
"github.com/prometheus/client_golang/prometheus/promhttp"
23-
"net/http/pprof"
2425
)
2526

2627
// RegisterRoutes create a router for OpenAPI

cmd/cdc/server/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (o *options) run(cmd *cobra.Command) error {
102102

103103
ctx, cancel := context.WithCancel(context.Background())
104104
defer cancel()
105-
105+
106106
util.LogHTTPProxies()
107107
svr, err := server.New(o.serverConfig, o.pdEndpoints)
108108
if err != nil {

logservice/logpuller/region_request_worker.go

+50-64
Original file line numberDiff line numberDiff line change
@@ -76,35 +76,26 @@ func newRegionRequestWorker(
7676
workerID: workerIDGen.Add(1),
7777
client: client,
7878
store: store,
79-
requestsCh: make(chan regionInfo, 1024), // 256 is an arbitrary number.
79+
requestsCh: make(chan regionInfo, 256), // 256 is an arbitrary number.
8080

8181
requestHeader: &cdcpb.Header{ClusterId: client.clusterID, TicdcVersion: version.ReleaseSemver()},
8282
}
8383
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
8484

85-
waitForPreFetching := func() {
86-
if worker.preFetchForConnecting != nil {
87-
log.Panic("preFetchForConnecting should be nil",
88-
zap.Uint64("workerID", worker.workerID),
89-
zap.Uint64("storeID", store.storeID),
90-
zap.String("addr", store.storeAddr))
91-
}
92-
for regionRequest := range worker.requestsCh {
93-
if !regionRequest.isStopped() {
94-
worker.preFetchForConnecting = &regionRequest
95-
return
96-
}
97-
}
98-
}
99-
10085
g.Go(func() error {
10186
for {
87+
// fetch for the first region request, so that can establish the grpc stream.
88+
for regionRequest := range worker.requestsCh {
89+
if !regionRequest.isStopped() {
90+
worker.preFetchForConnecting = &regionRequest
91+
break
92+
}
93+
}
10294
select {
10395
case <-ctx.Done():
10496
return ctx.Err()
10597
default:
10698
}
107-
waitForPreFetching()
10899
var regionErr error
109100
if err := version.CheckStoreVersion(ctx, worker.client.pd, worker.store.storeID); err != nil {
110101
if errors.Cause(err) == context.Canceled {
@@ -129,21 +120,15 @@ func newRegionRequestWorker(
129120
for subID, m := range worker.clearRegionStates() {
130121
for _, state := range m {
131122
state.markStopped(regionErr)
132-
regionEvent := regionEvent{
123+
event := regionEvent{
133124
state: state,
134125
worker: worker,
135126
}
136-
worker.client.pushRegionEventToDS(subID, regionEvent)
127+
worker.client.pushRegionEventToDS(subID, event)
137128
}
138129
}
139130
// The store may fail forever, so we need try to re-schedule all pending regions.
140-
for _, region := range worker.clearPendingRegions() {
141-
if region.isStopped() {
142-
// It means it's a special task for stopping the table.
143-
continue
144-
}
145-
client.onRegionFail(newRegionErrorInfo(region, regionErr))
146-
}
131+
worker.rescheduleRegions(regionErr)
147132
if err := util.Hang(ctx, time.Second); err != nil {
148133
return err
149134
}
@@ -199,7 +184,7 @@ func (s *regionRequestWorker) run(ctx context.Context) (canceled bool) {
199184
timer := time.After(10 * time.Second)
200185
g.Go(func() error {
201186
<-timer
202-
err := errors.New("inject force reconnect")
187+
err = errors.New("inject force reconnect")
203188
log.Info("inject force reconnect", zap.Error(err))
204189
return err
205190
})
@@ -341,19 +326,11 @@ func (s *regionRequestWorker) processRegionSendTask(
341326
return nil
342327
}
343328

344-
fetchMoreReq := func() (regionInfo, error) {
345-
for {
346-
var region regionInfo
347-
select {
348-
case <-ctx.Done():
349-
return region, ctx.Err()
350-
case region = <-s.requestsCh:
351-
return region, nil
352-
}
353-
}
354-
}
355-
356-
region := *s.preFetchForConnecting
329+
var (
330+
err error
331+
region regionInfo
332+
)
333+
region = *s.preFetchForConnecting
357334
s.preFetchForConnecting = nil
358335
for {
359336
// TODO: can region be nil?
@@ -367,23 +344,17 @@ func (s *regionRequestWorker) processRegionSendTask(
367344

368345
// It means it's a special task for stopping the table.
369346
if region.isStopped() {
370-
req := cdcpb.ChangeDataRequest{
371-
Header: s.requestHeader,
372-
RequestId: uint64(subID),
373-
Request: &cdcpb.ChangeDataRequest_Deregister_{
374-
Deregister: &cdcpb.ChangeDataRequest_Deregister{},
375-
},
376-
}
377-
if err := doSend(req); err != nil {
347+
req := s.newDeregisterRegionRequest(subID)
348+
if err = doSend(req); err != nil {
378349
return err
379350
}
380351
for _, state := range s.takeRegionStates(subID) {
381352
state.markStopped(&requestCancelledErr{})
382-
regionEvent := regionEvent{
353+
event := regionEvent{
383354
state: state,
384355
worker: s,
385356
}
386-
s.client.pushRegionEventToDS(subID, regionEvent)
357+
s.client.pushRegionEventToDS(subID, event)
387358
}
388359
} else if region.subscribedSpan.stopped.Load() {
389360
// It can be skipped directly because there must be no pending states from
@@ -395,19 +366,30 @@ func (s *regionRequestWorker) processRegionSendTask(
395366
state.start()
396367
s.addRegionState(subID, region.verID.GetID(), state)
397368

398-
if err := doSend(s.createRegionRequest(region)); err != nil {
369+
if err = doSend(s.newRegisterRegionRequest(region)); err != nil {
399370
return err
400371
}
401372
}
402373

403-
var err error
404-
if region, err = fetchMoreReq(); err != nil {
405-
return err
374+
select {
375+
case <-ctx.Done():
376+
return errors.Trace(ctx.Err())
377+
case region = <-s.requestsCh:
406378
}
407379
}
408380
}
409381

410-
func (s *regionRequestWorker) createRegionRequest(region regionInfo) cdcpb.ChangeDataRequest {
382+
func (s *regionRequestWorker) newDeregisterRegionRequest(subscriptionID SubscriptionID) cdcpb.ChangeDataRequest {
383+
return cdcpb.ChangeDataRequest{
384+
Header: s.requestHeader,
385+
RequestId: uint64(subscriptionID),
386+
Request: &cdcpb.ChangeDataRequest_Deregister_{
387+
Deregister: &cdcpb.ChangeDataRequest_Deregister{},
388+
},
389+
}
390+
}
391+
392+
func (s *regionRequestWorker) newRegisterRegionRequest(region regionInfo) cdcpb.ChangeDataRequest {
411393
return cdcpb.ChangeDataRequest{
412394
Header: s.requestHeader,
413395
RegionId: region.verID.GetID(),
@@ -471,16 +453,20 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS
471453
return subscriptions
472454
}
473455

474-
func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
475-
regions := make([]regionInfo, 0, len(s.requestsCh))
456+
func (s *regionRequestWorker) rescheduleRegions(regionErr error) {
476457
if s.preFetchForConnecting != nil {
477-
region := *s.preFetchForConnecting
478-
s.preFetchForConnecting = nil
479-
regions = append(regions, region)
458+
if !s.preFetchForConnecting.isStopped() {
459+
region := *s.preFetchForConnecting
460+
s.preFetchForConnecting = nil
461+
s.client.onRegionFail(newRegionErrorInfo(region, regionErr))
462+
}
480463
}
481-
// TODO: do we need to start with i := 0(i := len(regions)) if s.preFetchForConnecting is nil?
482-
for i := 1; i < cap(regions); i++ {
483-
regions = append(regions, <-s.requestsCh)
464+
465+
for region := range s.requestsCh {
466+
if region.isStopped() {
467+
// It means it's a special task for stopping the table.
468+
continue
469+
}
470+
s.client.onRegionFail(newRegionErrorInfo(region, regionErr))
484471
}
485-
return regions
486472
}

logservice/logpuller/subscription_client.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ type requestedStore struct {
472472
storeID uint64
473473
storeAddr string
474474
// Use to select a worker to send request.
475-
nextWorker atomic.Uint32
475+
nextWorker int
476476
requestWorkers []*regionRequestWorker
477477
}
478478

@@ -483,9 +483,15 @@ func (rs *requestedStore) broadcastRegionRequest(region regionInfo) {
483483
}
484484

485485
func (rs *requestedStore) sendRegionRequest(region regionInfo) {
486-
index := rs.nextWorker.Add(1) % uint32(len(rs.requestWorkers))
487-
worker := rs.requestWorkers[index]
488-
worker.sendRegionRequest(region)
486+
index := rs.nextWorker % len(rs.requestWorkers)
487+
rs.requestWorkers[index].sendRegionRequest(region)
488+
rs.nextWorker++
489+
}
490+
491+
func (rs *requestedStore) stopWorkers() {
492+
for _, w := range rs.requestWorkers {
493+
close(w.requestsCh)
494+
}
489495
}
490496

491497
// handleRegions receives regionInfo from regionCh and attch rpcCtx to them,
@@ -508,12 +514,7 @@ func (s *SubscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro
508514

509515
defer func() {
510516
for _, rs := range stores {
511-
for _, w := range rs.requestWorkers {
512-
close(w.requestsCh)
513-
for range w.requestsCh {
514-
// TODO: do we need handle it?
515-
}
516-
}
517+
rs.stopWorkers()
517518
}
518519
}()
519520

0 commit comments

Comments
 (0)