Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: support ticdc topology info in tidb dashboard #1209

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ func Wrap(cli *clientV3.Client, metrics map[string]prometheus.Counter) *ClientIm
return &ClientImpl{cli: cli, metrics: metrics, clock: clock.New()}
}

// NewWrappedClient creates a new ClientImpl with given raw clientV3.Client
func NewWrappedClient(cli *clientV3.Client) *ClientImpl {
metrics := map[string]prometheus.Counter{
EtcdPut: etcdRequestCounter.WithLabelValues(EtcdPut),
EtcdGet: etcdRequestCounter.WithLabelValues(EtcdGet),
EtcdDel: etcdRequestCounter.WithLabelValues(EtcdDel),
EtcdTxn: etcdRequestCounter.WithLabelValues(EtcdTxn),
EtcdGrant: etcdRequestCounter.WithLabelValues(EtcdGrant),
EtcdRevoke: etcdRequestCounter.WithLabelValues(EtcdRevoke),
}
return Wrap(cli, metrics)
}

// Close closes the clientV3.Client
func (c *ClientImpl) Close() error {
return c.cli.Close()
Expand Down
24 changes: 18 additions & 6 deletions pkg/upstream/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@ import (
"github.com/benbjohnson/clock"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/tiflow/pkg/security"
pd "github.com/tikv/pd/client"
clientV3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// CaptureTopologyCfg stores the information of the capture topology.
type NodeTopologyCfg struct {
*node.Info

// GCServiceID identify the cdc cluster gc service id
GCServiceID string
SessionTTL int64
}

// Manager manages all upstream.
type Manager struct {
// upstreamID map to *Upstream.
Expand All @@ -40,18 +50,20 @@ type Manager struct {

defaultUpstream *Upstream

initUpstreamFunc func(context.Context, *Upstream) error
initUpstreamFunc func(context.Context, *Upstream, *NodeTopologyCfg) error
nodeCfg NodeTopologyCfg
}

// NewManager creates a new Manager.
// ctx will be used to initialize upstream spawned by this Manager.
func NewManager(ctx context.Context) *Manager {
func NewManager(ctx context.Context, cfg NodeTopologyCfg) *Manager {
ctx, cancel := context.WithCancel(ctx)
return &Manager{
ups: new(sync.Map),
ctx: ctx,
cancel: cancel,
initUpstreamFunc: initUpstream,
nodeCfg: cfg,
}
}

Expand All @@ -60,7 +72,7 @@ func (m *Manager) AddDefaultUpstream(
pdEndpoints []string,
conf *security.Credential,
pdClient pd.Client,
etcdClient *clientV3.Client,
etcdClient etcd.Client,
) (*Upstream, error) {
// use the pdClient and etcdClient pass from cdc server as the default upstream
// to reduce the creation times of pdClient to make cdc server more stable
Expand All @@ -74,7 +86,7 @@ func (m *Manager) AddDefaultUpstream(
wg: new(sync.WaitGroup),
clock: clock.New(),
}
if err := m.initUpstreamFunc(m.ctx, up); err != nil {
if err := m.initUpstreamFunc(m.ctx, up, &m.nodeCfg); err != nil {
return nil, cerror.Trace(err)
}
m.defaultUpstream = up
Expand Down Expand Up @@ -114,7 +126,7 @@ func (m *Manager) add(upstreamID uint64,
up := newUpstream(pdEndpoints, securityConf)
m.ups.Store(upstreamID, up)
go func() {
err := m.initUpstreamFunc(m.ctx, up)
err := m.initUpstreamFunc(m.ctx, up, &m.nodeCfg)
up.err.Store(err)
}()
up.resetIdleTime()
Expand Down
120 changes: 120 additions & 0 deletions pkg/upstream/topo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package upstream

import (
"context"
"strconv"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb-dashboard/util/distro"
"github.com/pingcap/tidb-dashboard/util/netutil"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tiflow/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

const (
// topologyTiCDC is /topology/ticdc/{clusterID}/{ip:port}.
topologyTiCDC = "/topology/ticdc/%s/%s"
// topologyTiDB is /topology/tidb/{ip:port}.
// Refer to https://github.com/pingcap/tidb/blob/release-7.5/pkg/domain/infosync/info.go#L78-L79.
topologyTiDB = infosync.TopologyInformationPath
topologyTiDBTTL = infosync.TopologySessionTTL
// defaultTimeout is the default timeout for etcd and mysql operations.
defaultTimeout = time.Second * 2
)

type tidbInstance struct {
IP string
Port uint
}

// fetchTiDBTopology parses the TiDB topology from etcd.
func fetchTiDBTopology(ctx context.Context, etcdClient *clientv3.Client) ([]tidbInstance, error) {
ctx2, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

resp, err := etcdClient.Get(ctx2, topologyTiDB, clientv3.WithPrefix())
if err != nil {
return nil, errors.ErrPDEtcdAPIError.Wrap(err)
}

nodesAlive := make(map[string]struct{}, len(resp.Kvs))
nodesInfo := make(map[string]*tidbInstance, len(resp.Kvs))

for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.HasPrefix(key, topologyTiDB) {
continue
}
// remainingKey looks like `ip:port/info` or `ip:port/ttl`.
remainingKey := strings.TrimPrefix(key[len(topologyTiDB):], "/")
keyParts := strings.Split(remainingKey, "/")
if len(keyParts) != 2 {
log.Warn("Ignored invalid topology key", zap.String("component", distro.R().TiDB), zap.String("key", key))
continue
}

switch keyParts[1] {
case "info":
address := keyParts[0]
hostname, port, err := netutil.ParseHostAndPortFromAddress(address)
if err != nil {
log.Warn("Ignored invalid tidb topology info entry",
zap.String("key", key),
zap.String("value", string(kv.Value)),
zap.Error(err))
continue
}
nodesInfo[keyParts[0]] = &tidbInstance{
IP: hostname,
Port: port,
}
case "ttl":
alive, err := parseTiDBAliveness(kv.Value)
if !alive || err != nil {
log.Warn("Ignored invalid tidb topology TTL entry",
zap.String("key", key),
zap.String("value", string(kv.Value)),
zap.Error(err))
continue
}
nodesAlive[keyParts[0]] = struct{}{}
}
}

nodes := make([]tidbInstance, 0)
for addr, info := range nodesInfo {
if _, ok := nodesAlive[addr]; ok {
nodes = append(nodes, *info)
}
}
return nodes, nil
}

func parseTiDBAliveness(value []byte) (bool, error) {
unixTimestampNano, err := strconv.ParseUint(string(value), 10, 64)
if err != nil {
return false, errors.ErrUnmarshalFailed.Wrap(err)
}
t := time.Unix(0, int64(unixTimestampNano))
if time.Since(t) > topologyTiDBTTL*time.Second {
return false, nil
}
return true, nil
}
35 changes: 32 additions & 3 deletions pkg/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Upstream struct {
PdEndpoints []string
SecurityConfig *security.Credential
PDClient pd.Client
etcdCli *clientV3.Client
etcdCli etcd.Client
session *concurrency.Session

KVStorage tidbkv.Storage
Expand Down Expand Up @@ -122,7 +122,7 @@ func CreateTiStore(urls string, credential *security.Credential) (tidbkv.Storage
}

// init initializes the upstream
func initUpstream(ctx context.Context, up *Upstream) error {
func initUpstream(ctx context.Context, up *Upstream, cfg *NodeTopologyCfg) error {
ctx, up.cancel = context.WithCancel(ctx)
grpcTLSOption, err := up.SecurityConfig.ToGRPCDialOption()
if err != nil {
Expand Down Expand Up @@ -159,7 +159,7 @@ func initUpstream(ctx context.Context, up *Upstream) error {
if err != nil {
return errors.Trace(err)
}
up.etcdCli = etcdCli
up.etcdCli = etcd.NewWrappedClient(etcdCli)
}
clusterID := up.PDClient.GetClusterID(ctx)
if up.ID != 0 && up.ID != clusterID {
Expand Down Expand Up @@ -200,6 +200,11 @@ func initUpstream(ctx context.Context, up *Upstream) error {
zap.Strings("upstreamEndpoints", up.PdEndpoints))
}

err = up.registerTopologyInfo(ctx, cfg)
if err != nil {
return errors.Trace(err)
}

up.wg.Add(1)
go func() {
defer up.wg.Done()
Expand Down Expand Up @@ -329,3 +334,27 @@ func (up *Upstream) shouldClose() bool {

return false
}

// Put ticdc topology information to etcd, the prefix is
// "/topology/ticdc/{clusterID}/{ip:port}"
// tidb-dashboard will use this information to show the topology
// information of the ticdc cluster.
func (up *Upstream) registerTopologyInfo(ctx context.Context, cfg *NodeTopologyCfg) error {
lease, err := up.etcdCli.Grant(ctx, cfg.SessionTTL)
if err != nil {
return errors.Trace(err)
}

up.session, err = up.etcdCli.NewSession(concurrency.WithLease(lease.ID))
if err != nil {
return errors.Trace(err)
}
// register capture info to upstream pd
key := fmt.Sprintf(topologyTiCDC, cfg.GCServiceID, cfg.AdvertiseAddr)
value, err := cfg.Info.Marshal()
if err != nil {
return errors.Trace(err)
}
_, err = up.etcdCli.Put(ctx, key, string(value), clientV3.WithLease(up.session.Lease()))
return errors.WrapError(errors.ErrPDEtcdAPIError, err)
}
22 changes: 17 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
tiserver "github.com/pingcap/ticdc/pkg/server"
"github.com/pingcap/ticdc/pkg/upstream"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -64,11 +65,12 @@ type server struct {

liveness model.Liveness

pdClient pd.Client
pdAPIClient pdutil.PDAPIClient
pdEndpoints []string
coordinatorMu sync.Mutex
coordinator tiserver.Coordinator
pdClient pd.Client
pdAPIClient pdutil.PDAPIClient
pdEndpoints []string
coordinatorMu sync.Mutex
coordinator tiserver.Coordinator
upstreamManager *upstream.Manager

// session keeps alive between the server and etcd
session *concurrency.Session
Expand Down Expand Up @@ -149,6 +151,16 @@ func (c *server) initialize(ctx context.Context) error {
schemaStore := schemastore.New(ctx, conf.DataDir, subscriptionClient, c.pdClient, c.PDClock, c.KVStorage)
eventStore := eventstore.New(ctx, conf.DataDir, subscriptionClient, c.PDClock)
eventService := eventservice.New(eventStore, schemaStore)
c.upstreamManager = upstream.NewManager(ctx, upstream.NodeTopologyCfg{
Info: c.info,
GCServiceID: c.EtcdClient.GetGCServiceID(),
SessionTTL: int64(conf.CaptureSessionTTL),
})
_, err := c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, conf.Security, c.pdClient, c.EtcdClient.GetEtcdClient())
if err != nil {
return errors.Trace(err)
}

c.subModules = []common.SubModule{
nodeManager,
subscriptionClient,
Expand Down