Skip to content

Commit cb9e6c4

Browse files
authored
Fix several bugs in cloudevent library (#107)
1. set resource version in codec with string type 2. Do not check source in lister 3. allow setting token for grpc connection Signed-off-by: Jian Qiu <[email protected]>
1 parent ab71141 commit cb9e6c4

File tree

10 files changed

+53
-42
lines changed

10 files changed

+53
-42
lines changed

pkg/cloudevents/clients/cluster/codec.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package cluster
22

33
import (
44
"fmt"
5-
"strconv"
6-
75
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
86

97
clusterv1 "open-cluster-management.io/api/cluster/v1"
@@ -42,11 +40,7 @@ func (c *ManagedClusterCodec) Encode(source string, eventType types.CloudEventsT
4240
NewEvent()
4341

4442
if cluster.ResourceVersion != "" {
45-
resourceVersion, err := strconv.ParseInt(cluster.ResourceVersion, 10, 64)
46-
if err != nil {
47-
return nil, fmt.Errorf("failed to parse the resourceversion for managedcluster %s, %v", cluster.Name, err)
48-
}
49-
evt.SetExtension(types.ExtensionResourceVersion, resourceVersion)
43+
evt.SetExtension(types.ExtensionResourceVersion, cluster.ResourceVersion)
5044
}
5145

5246
newCluster := cluster.DeepCopy()

pkg/cloudevents/clients/csr/client.go

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
certificatev1 "k8s.io/api/certificates/v1"
1111
"k8s.io/apimachinery/pkg/api/errors"
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
utilrand "k8s.io/apimachinery/pkg/util/rand"
1314
"k8s.io/apimachinery/pkg/watch"
1415
"k8s.io/klog/v2"
1516

@@ -43,6 +44,11 @@ func NewCSRClient(
4344
}
4445

4546
func (c *CSRClient) Create(ctx context.Context, csr *certificatev1.CertificateSigningRequest, opts metav1.CreateOptions) (*certificatev1.CertificateSigningRequest, error) {
47+
// generate csr name if name is not set
48+
if csr.Name == "" && csr.GenerateName != "" {
49+
csr.Name = csr.GenerateName + utilrand.String(5)
50+
csr.GenerateName = ""
51+
}
4652
klog.V(4).Infof("creating CSR %s", csr.Name)
4753
_, exists, err := c.watcherStore.Get("", csr.Name)
4854
if err != nil {

pkg/cloudevents/clients/csr/client_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package csr
22

33
import (
44
"context"
5+
v1 "open-cluster-management.io/api/cluster/v1"
56
"testing"
67
"time"
78

@@ -32,6 +33,9 @@ func TestCreate(t *testing.T) {
3233
csr: &certificatev1.CertificateSigningRequest{
3334
ObjectMeta: metav1.ObjectMeta{
3435
Name: "cluster1",
36+
Labels: map[string]string{
37+
v1.ClusterNameLabelKey: "cluster1",
38+
},
3539
},
3640
},
3741
expectedErr: "",
@@ -42,6 +46,9 @@ func TestCreate(t *testing.T) {
4246
csr: &certificatev1.CertificateSigningRequest{
4347
ObjectMeta: metav1.ObjectMeta{
4448
Name: "cluster1",
49+
Labels: map[string]string{
50+
v1.ClusterNameLabelKey: "cluster1",
51+
},
4552
},
4653
},
4754
expectedErr: "certificatesigningrequests.certificates.k8s.io \"cluster1\" already exists",

pkg/cloudevents/clients/csr/codec.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ package csr
22

33
import (
44
"fmt"
5-
"strconv"
6-
75
cloudevents "github.com/cloudevents/sdk-go/v2"
86
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
v1 "open-cluster-management.io/api/cluster/v1"
98

109
certificatev1 "k8s.io/api/certificates/v1"
1110
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -35,17 +34,21 @@ func (c *CSRCodec) Encode(source string, eventType types.CloudEventsType, csr *c
3534
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
3635
}
3736

37+
if len(csr.Labels) == 0 {
38+
return nil, fmt.Errorf("no cluster label found for CSR")
39+
}
40+
cluster, ok := csr.Labels[v1.ClusterNameLabelKey]
41+
if !ok {
42+
return nil, fmt.Errorf("no cluster name found for CSR")
43+
}
44+
3845
evt := types.NewEventBuilder(source, eventType).
3946
WithResourceID(csr.Name).
40-
WithClusterName(csr.Name).
47+
WithClusterName(cluster).
4148
NewEvent()
4249

4350
if csr.ResourceVersion != "" {
44-
resourceVersion, err := strconv.ParseInt(csr.ResourceVersion, 10, 64)
45-
if err != nil {
46-
return nil, fmt.Errorf("failed to parse the resourceversion for csr %s, %v", csr.Name, err)
47-
}
48-
evt.SetExtension(types.ExtensionResourceVersion, resourceVersion)
51+
evt.SetExtension(types.ExtensionResourceVersion, csr.ResourceVersion)
4952
}
5053

5154
newCSR := csr.DeepCopy()

pkg/cloudevents/clients/store/lister.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package store
22

33
import (
4-
"fmt"
5-
64
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
75

8-
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
96
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
107
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
118
)
@@ -25,9 +22,7 @@ func NewAgentWatcherStoreLister[T generic.ResourceObject](store ClientWatcherSto
2522
func (l *AgentWatcherStoreLister[T]) List(options types.ListOptions) ([]T, error) {
2623
opts := metav1.ListOptions{}
2724

28-
if options.Source != types.SourceAll {
29-
opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source)
30-
}
25+
// TODO we might want to specify source when list
3126

3227
list, err := l.store.List("", opts)
3328
if err != nil {

pkg/cloudevents/clients/store/lister_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestAgentLister(t *testing.T) {
3232
}
3333
if err := clusterStore.Add(&clusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{
3434
Name: "test2",
35-
Labels: map[string]string{common.CloudEventsOriginalSourceLabelKey: "source2"},
35+
Labels: map[string]string{common.CloudEventsOriginalSourceLabelKey: "source1"},
3636
}}); err != nil {
3737
t.Error(err)
3838
}
@@ -42,7 +42,7 @@ func TestAgentLister(t *testing.T) {
4242
if err != nil {
4343
t.Error(err)
4444
}
45-
if len(clusters) != 1 {
45+
if len(clusters) != 2 {
4646
t.Error("unexpected clusters")
4747
}
4848
}

pkg/cloudevents/generic/options/grpc/options.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type GRPCDialer struct {
3030
URL string
3131
KeepAliveOptions KeepAliveOptions
3232
TLSConfig *tls.Config
33-
TokenFile string
33+
Token string
3434
mu sync.Mutex // Mutex to protect the connection.
3535
conn *grpc.ClientConn // Cached gRPC client connection.
3636
}
@@ -66,15 +66,10 @@ func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) {
6666
if d.TLSConfig != nil {
6767
// Enable TLS
6868
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(d.TLSConfig)))
69-
if len(d.TokenFile) != 0 {
70-
// Use token-based authentication if token file is provided.
71-
token, err := os.ReadFile(d.TokenFile)
72-
if err != nil {
73-
return nil, fmt.Errorf("failed to read token file %s, %v", d.TokenFile, err)
74-
}
69+
if len(d.Token) != 0 {
7570
perRPCCred := oauth.TokenSource{
7671
TokenSource: oauth2.StaticTokenSource(&oauth2.Token{
77-
AccessToken: string(token),
72+
AccessToken: d.Token,
7873
})}
7974
// Add per-RPC credentials to the dial options.
8075
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(perRPCCred))
@@ -130,6 +125,8 @@ type GRPCConfig struct {
130125
ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"`
131126
// TokenFile is the file path to a token file for authentication.
132127
TokenFile string `json:"tokenFile,omitempty" yaml:"tokenFile,omitempty"`
128+
// Token is the token for authentication
129+
Token string `json:"token" yaml:"token"`
133130
// keepalive options
134131
KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"`
135132
}
@@ -175,14 +172,22 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) {
175172
if config.ClientCertFile != "" && config.ClientKeyFile != "" && config.CAFile == "" {
176173
return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile")
177174
}
178-
if config.TokenFile != "" && config.CAFile == "" {
175+
token := config.Token
176+
if config.Token == "" && config.TokenFile != "" {
177+
tokenBytes, err := os.ReadFile(config.TokenFile)
178+
if err != nil {
179+
return nil, fmt.Errorf("failed to read token file %s, %v", config.TokenFile, err)
180+
}
181+
token = string(tokenBytes)
182+
}
183+
if token != "" && config.CAFile == "" {
179184
return nil, fmt.Errorf("setting tokenFile requires caFile")
180185
}
181186

182187
options := &GRPCOptions{
183188
Dialer: &GRPCDialer{
184-
URL: config.URL,
185-
TokenFile: config.TokenFile,
189+
URL: config.URL,
190+
Token: token,
186191
},
187192
}
188193

pkg/cloudevents/generic/options/grpc/options_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestBuildGRPCOptionsFromFlags(t *testing.T) {
3434
},
3535
{
3636
name: "token config without caFile",
37-
config: "{\"url\":\"test\",\"tokenFile\":\"test\"}",
37+
config: "{\"url\":\"test\",\"token\":\"test\"}",
3838
expectedErrorMsg: "setting tokenFile requires caFile",
3939
},
4040
{

pkg/cloudevents/server/grpc/server.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,6 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
245245
}
246246

247247
for _, obj := range objs {
248-
// only respond with the resource of the resync type
249-
if obj.Type() != eventDataType.String() {
250-
continue
251-
}
252248
// respond with the deleting resource regardless of the resource version
253249
if _, ok := obj.Extensions()[types.ExtensionDeletionTimestamp]; ok {
254250
err = bkr.handleRes(ctx, obj, eventDataType, "delete_request")
@@ -259,8 +255,8 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
259255
}
260256

261257
lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions)
262-
rv := obj.Extensions()[types.ExtensionResourceVersion].(string)
263-
currentResourceVersion, err := strconv.ParseInt(rv, 10, 64)
258+
currentResourceVersion, err := strconv.ParseInt(
259+
obj.Extensions()[types.ExtensionResourceVersion].(string), 10, 64)
264260
if err != nil {
265261
log.V(4).Info("ignore the obj %v since it has a invalid resourceVersion, %v", obj, err)
266262
continue

test/integration/cloudevents/util/grpc.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package util
33
import (
44
"crypto/tls"
55
"crypto/x509"
6+
"os"
67
"time"
78

89
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
@@ -36,7 +37,11 @@ func newGRPCOptions(certPool *x509.CertPool, brokerURL, tokenFile string) *grpc.
3637
}
3738

3839
if tokenFile != "" {
39-
grpcOptions.Dialer.TokenFile = tokenFile
40+
token, err := os.ReadFile(tokenFile)
41+
if err != nil {
42+
panic(err)
43+
}
44+
grpcOptions.Dialer.Token = string(token)
4045
}
4146

4247
return grpcOptions

0 commit comments

Comments
 (0)