diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 28552b999..1cad84faf 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -108,6 +108,19 @@ func Wrap(cli *clientV3.Client, metrics map[string]prometheus.Counter) *ClientIm return &ClientImpl{cli: cli, metrics: metrics, clock: clock.New()} } +// NewWrappedClient creates a new ClientImpl with given raw clientV3.Client +func NewWrappedClient(cli *clientV3.Client) *ClientImpl { + metrics := map[string]prometheus.Counter{ + EtcdPut: etcdRequestCounter.WithLabelValues(EtcdPut), + EtcdGet: etcdRequestCounter.WithLabelValues(EtcdGet), + EtcdDel: etcdRequestCounter.WithLabelValues(EtcdDel), + EtcdTxn: etcdRequestCounter.WithLabelValues(EtcdTxn), + EtcdGrant: etcdRequestCounter.WithLabelValues(EtcdGrant), + EtcdRevoke: etcdRequestCounter.WithLabelValues(EtcdRevoke), + } + return Wrap(cli, metrics) +} + // Close closes the clientV3.Client func (c *ClientImpl) Close() error { return c.cli.Close() diff --git a/pkg/upstream/manager.go b/pkg/upstream/manager.go index 0b03cae87..44b54067d 100644 --- a/pkg/upstream/manager.go +++ b/pkg/upstream/manager.go @@ -21,12 +21,22 @@ import ( "github.com/benbjohnson/clock" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/tiflow/pkg/security" pd "github.com/tikv/pd/client" - clientV3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) +// NodeTopologyCfg stores the information of the capture topology. +type NodeTopologyCfg struct { + *node.Info + + // GCServiceID identify the cdc cluster gc service id + GCServiceID string + SessionTTL int64 +} + // Manager manages all upstream. type Manager struct { // upstreamID map to *Upstream. @@ -40,18 +50,20 @@ type Manager struct { defaultUpstream *Upstream - initUpstreamFunc func(context.Context, *Upstream) error + initUpstreamFunc func(context.Context, *Upstream, *NodeTopologyCfg) error + nodeCfg NodeTopologyCfg } // NewManager creates a new Manager. // ctx will be used to initialize upstream spawned by this Manager. -func NewManager(ctx context.Context) *Manager { +func NewManager(ctx context.Context, cfg NodeTopologyCfg) *Manager { ctx, cancel := context.WithCancel(ctx) return &Manager{ ups: new(sync.Map), ctx: ctx, cancel: cancel, initUpstreamFunc: initUpstream, + nodeCfg: cfg, } } @@ -60,7 +72,7 @@ func (m *Manager) AddDefaultUpstream( pdEndpoints []string, conf *security.Credential, pdClient pd.Client, - etcdClient *clientV3.Client, + etcdClient etcd.Client, ) (*Upstream, error) { // use the pdClient and etcdClient pass from cdc server as the default upstream // to reduce the creation times of pdClient to make cdc server more stable @@ -74,7 +86,7 @@ func (m *Manager) AddDefaultUpstream( wg: new(sync.WaitGroup), clock: clock.New(), } - if err := m.initUpstreamFunc(m.ctx, up); err != nil { + if err := m.initUpstreamFunc(m.ctx, up, &m.nodeCfg); err != nil { return nil, cerror.Trace(err) } m.defaultUpstream = up @@ -114,7 +126,7 @@ func (m *Manager) add(upstreamID uint64, up := newUpstream(pdEndpoints, securityConf) m.ups.Store(upstreamID, up) go func() { - err := m.initUpstreamFunc(m.ctx, up) + err := m.initUpstreamFunc(m.ctx, up, &m.nodeCfg) up.err.Store(err) }() up.resetIdleTime() diff --git a/pkg/upstream/topo.go b/pkg/upstream/topo.go new file mode 100644 index 000000000..36cabab31 --- /dev/null +++ b/pkg/upstream/topo.go @@ -0,0 +1,120 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package upstream + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb-dashboard/util/distro" + "github.com/pingcap/tidb-dashboard/util/netutil" + "github.com/pingcap/tidb/pkg/domain/infosync" + "github.com/pingcap/tiflow/pkg/errors" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + // topologyTiCDC is /topology/ticdc/{clusterID}/{ip:port}. + topologyTiCDC = "/topology/ticdc/%s/%s" + // topologyTiDB is /topology/tidb/{ip:port}. + // Refer to https://github.com/pingcap/tidb/blob/release-7.5/pkg/domain/infosync/info.go#L78-L79. + topologyTiDB = infosync.TopologyInformationPath + topologyTiDBTTL = infosync.TopologySessionTTL + // defaultTimeout is the default timeout for etcd and mysql operations. + defaultTimeout = time.Second * 2 +) + +type tidbInstance struct { + IP string + Port uint +} + +// fetchTiDBTopology parses the TiDB topology from etcd. +func fetchTiDBTopology(ctx context.Context, etcdClient *clientv3.Client) ([]tidbInstance, error) { + ctx2, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + resp, err := etcdClient.Get(ctx2, topologyTiDB, clientv3.WithPrefix()) + if err != nil { + return nil, errors.ErrPDEtcdAPIError.Wrap(err) + } + + nodesAlive := make(map[string]struct{}, len(resp.Kvs)) + nodesInfo := make(map[string]*tidbInstance, len(resp.Kvs)) + + for _, kv := range resp.Kvs { + key := string(kv.Key) + if !strings.HasPrefix(key, topologyTiDB) { + continue + } + // remainingKey looks like `ip:port/info` or `ip:port/ttl`. + remainingKey := strings.TrimPrefix(key[len(topologyTiDB):], "/") + keyParts := strings.Split(remainingKey, "/") + if len(keyParts) != 2 { + log.Warn("Ignored invalid topology key", zap.String("component", distro.R().TiDB), zap.String("key", key)) + continue + } + + switch keyParts[1] { + case "info": + address := keyParts[0] + hostname, port, err := netutil.ParseHostAndPortFromAddress(address) + if err != nil { + log.Warn("Ignored invalid tidb topology info entry", + zap.String("key", key), + zap.String("value", string(kv.Value)), + zap.Error(err)) + continue + } + nodesInfo[keyParts[0]] = &tidbInstance{ + IP: hostname, + Port: port, + } + case "ttl": + alive, err := parseTiDBAliveness(kv.Value) + if !alive || err != nil { + log.Warn("Ignored invalid tidb topology TTL entry", + zap.String("key", key), + zap.String("value", string(kv.Value)), + zap.Error(err)) + continue + } + nodesAlive[keyParts[0]] = struct{}{} + } + } + + nodes := make([]tidbInstance, 0) + for addr, info := range nodesInfo { + if _, ok := nodesAlive[addr]; ok { + nodes = append(nodes, *info) + } + } + return nodes, nil +} + +func parseTiDBAliveness(value []byte) (bool, error) { + unixTimestampNano, err := strconv.ParseUint(string(value), 10, 64) + if err != nil { + return false, errors.ErrUnmarshalFailed.Wrap(err) + } + t := time.Unix(0, int64(unixTimestampNano)) + if time.Since(t) > topologyTiDBTTL*time.Second { + return false, nil + } + return true, nil +} diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index cf9d4d32b..a235165a2 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -63,7 +63,7 @@ type Upstream struct { PdEndpoints []string SecurityConfig *security.Credential PDClient pd.Client - etcdCli *clientV3.Client + etcdCli etcd.Client session *concurrency.Session KVStorage tidbkv.Storage @@ -122,7 +122,7 @@ func CreateTiStore(urls string, credential *security.Credential) (tidbkv.Storage } // init initializes the upstream -func initUpstream(ctx context.Context, up *Upstream) error { +func initUpstream(ctx context.Context, up *Upstream, cfg *NodeTopologyCfg) error { ctx, up.cancel = context.WithCancel(ctx) grpcTLSOption, err := up.SecurityConfig.ToGRPCDialOption() if err != nil { @@ -159,7 +159,7 @@ func initUpstream(ctx context.Context, up *Upstream) error { if err != nil { return errors.Trace(err) } - up.etcdCli = etcdCli + up.etcdCli = etcd.NewWrappedClient(etcdCli) } clusterID := up.PDClient.GetClusterID(ctx) if up.ID != 0 && up.ID != clusterID { @@ -200,6 +200,11 @@ func initUpstream(ctx context.Context, up *Upstream) error { zap.Strings("upstreamEndpoints", up.PdEndpoints)) } + err = up.registerTopologyInfo(ctx, cfg) + if err != nil { + return errors.Trace(err) + } + up.wg.Add(1) go func() { defer up.wg.Done() @@ -329,3 +334,27 @@ func (up *Upstream) shouldClose() bool { return false } + +// Put ticdc topology information to etcd, the prefix is +// "/topology/ticdc/{clusterID}/{ip:port}" +// tidb-dashboard will use this information to show the topology +// information of the ticdc cluster. +func (up *Upstream) registerTopologyInfo(ctx context.Context, cfg *NodeTopologyCfg) error { + lease, err := up.etcdCli.Grant(ctx, cfg.SessionTTL) + if err != nil { + return errors.Trace(err) + } + + up.session, err = up.etcdCli.NewSession(concurrency.WithLease(lease.ID)) + if err != nil { + return errors.Trace(err) + } + // register capture info to upstream pd + key := fmt.Sprintf(topologyTiCDC, cfg.GCServiceID, cfg.AdvertiseAddr) + value, err := cfg.Info.Marshal() + if err != nil { + return errors.Trace(err) + } + _, err = up.etcdCli.Put(ctx, key, string(value), clientV3.WithLease(up.session.Lease())) + return errors.WrapError(errors.ErrPDEtcdAPIError, err) +} diff --git a/server/server.go b/server/server.go index d489a9034..5c575dbad 100644 --- a/server/server.go +++ b/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" tiserver "github.com/pingcap/ticdc/pkg/server" + "github.com/pingcap/ticdc/pkg/upstream" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/model" @@ -64,11 +65,12 @@ type server struct { liveness model.Liveness - pdClient pd.Client - pdAPIClient pdutil.PDAPIClient - pdEndpoints []string - coordinatorMu sync.Mutex - coordinator tiserver.Coordinator + pdClient pd.Client + pdAPIClient pdutil.PDAPIClient + pdEndpoints []string + coordinatorMu sync.Mutex + coordinator tiserver.Coordinator + upstreamManager *upstream.Manager // session keeps alive between the server and etcd session *concurrency.Session @@ -149,6 +151,16 @@ func (c *server) initialize(ctx context.Context) error { schemaStore := schemastore.New(ctx, conf.DataDir, subscriptionClient, c.pdClient, c.PDClock, c.KVStorage) eventStore := eventstore.New(ctx, conf.DataDir, subscriptionClient, c.PDClock) eventService := eventservice.New(eventStore, schemaStore) + c.upstreamManager = upstream.NewManager(ctx, upstream.NodeTopologyCfg{ + Info: c.info, + GCServiceID: c.EtcdClient.GetGCServiceID(), + SessionTTL: int64(conf.CaptureSessionTTL), + }) + _, err := c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, conf.Security, c.pdClient, c.EtcdClient.GetEtcdClient()) + if err != nil { + return errors.Trace(err) + } + c.subModules = []common.SubModule{ nodeManager, subscriptionClient,