Skip to content

Commit 92b6585

Browse files
authored
grpc broker (#115)
Signed-off-by: Wei Liu <[email protected]>
1 parent 062f078 commit 92b6585

File tree

4 files changed

+23
-23
lines changed

4 files changed

+23
-23
lines changed

pkg/cloudevents/server/grpc/options/server.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import (
55
"crypto/tls"
66
"crypto/x509"
77
"fmt"
8+
"os"
9+
810
"google.golang.org/grpc"
911
"google.golang.org/grpc/credentials"
1012
"google.golang.org/grpc/keepalive"
1113
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1214
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
1315
grpcserver "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
1416
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authn"
15-
"os"
1617
)
1718

1819
// PreStartHook is an interface to start hook before grpc server is started.
@@ -99,7 +100,7 @@ func (s *Server) Run(ctx context.Context) error {
99100
grpc.ChainStreamInterceptor(newAuthStreamInterceptor(s.authenticators...)))
100101

101102
grpcServer := grpc.NewServer(grpcServerOptions...)
102-
grpcEventServer := grpcserver.NewGRPCBroker(grpcServer, ":"+s.options.ServerBindPort)
103+
grpcEventServer := grpcserver.NewGRPCBroker(grpcServer)
103104

104105
for t, service := range s.services {
105106
grpcEventServer.RegisterService(t, service)
@@ -110,7 +111,7 @@ func (s *Server) Run(ctx context.Context) error {
110111
hook.Run(ctx)
111112
}
112113

113-
go grpcEventServer.Start(ctx)
114+
go grpcEventServer.Start(ctx, ":"+s.options.ServerBindPort)
114115
<-ctx.Done()
115116
return nil
116117
}

pkg/cloudevents/server/grpc/server.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,18 @@ type GRPCBroker struct {
4545
pbv1.UnimplementedCloudEventServiceServer
4646
grpcServer *grpc.Server
4747
services map[types.CloudEventsDataType]server.Service
48-
bindAddress string
4948
subscribers map[string]*subscriber // registered subscribers
5049
mu sync.RWMutex
5150
}
5251

53-
// NewGRPCBroker creates a new gRPC broker with the given configuration.
54-
func NewGRPCBroker(srv *grpc.Server, addr string) server.AgentEventServer {
55-
klog.Infof("Serving gRPC broker without TLS at %s", addr)
52+
// NewGRPCBroker creates a new gRPC broker with the given gRPC server.
53+
func NewGRPCBroker(srv *grpc.Server) server.AgentEventServer {
5654
broker := &GRPCBroker{
5755
grpcServer: srv,
58-
bindAddress: addr,
5956
subscribers: make(map[string]*subscriber),
6057
services: make(map[types.CloudEventsDataType]server.Service),
6158
}
59+
pbv1.RegisterCloudEventServiceServer(broker.grpcServer, broker)
6260
return broker
6361
}
6462

@@ -67,15 +65,14 @@ func (bkr *GRPCBroker) RegisterService(t types.CloudEventsDataType, service serv
6765
service.RegisterHandler(bkr)
6866
}
6967

70-
// Start starts the gRPC broker
71-
func (bkr *GRPCBroker) Start(ctx context.Context) {
68+
// Start starts the gRPC broker at the given address
69+
func (bkr *GRPCBroker) Start(ctx context.Context, addr string) {
7270
logger := klog.FromContext(ctx)
73-
logger.Info("Starting gRPC broker")
74-
lis, err := net.Listen("tcp", bkr.bindAddress)
71+
logger.Info("Starting gRPC broker at %s", addr)
72+
lis, err := net.Listen("tcp", addr)
7573
if err != nil {
7674
utilruntime.Must(fmt.Errorf("failed to listen: %v", err))
7775
}
78-
pbv1.RegisterCloudEventServiceServer(bkr.grpcServer, bkr)
7976
go func() {
8077
if err := bkr.grpcServer.Serve(lis); err != nil {
8178
utilruntime.Must(fmt.Errorf("failed to serve gRPC broker: %v", err))
@@ -87,7 +84,7 @@ func (bkr *GRPCBroker) Start(ctx context.Context) {
8784
klog.Infof("Shutting down gRPC broker")
8885
}
8986

90-
// Publish in stub implementation for maestro agent publish resource status back to maestro server.
87+
// Publish in stub implementation for agent publish resource status.
9188
func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest) (*emptypb.Empty, error) {
9289
logger := klog.FromContext(ctx)
9390
// WARNING: don't use "evt, err := pb.FromProto(pubReq.Event)" to convert protobuf to cloudevent
@@ -101,7 +98,7 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
10198
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse cloud event type %s, %v", evt.Type(), err))
10299
}
103100

104-
logger.V(4).Info("receive the event with grpc broker", "event", evt)
101+
logger.Info("receive the event with grpc broker", "event", evt)
105102

106103
// handler resync request
107104
if eventType.Action == types.ResyncRequestAction {
@@ -145,7 +142,7 @@ func (bkr *GRPCBroker) register(
145142
errChan: errChan,
146143
}
147144

148-
klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
145+
klog.Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
149146

150147
return id, errChan
151148
}
@@ -160,9 +157,9 @@ func (bkr *GRPCBroker) unregister(id string) {
160157
delete(bkr.subscribers, id)
161158
}
162159

163-
// Subscribe in stub implementation for maestro agent subscribe resource spec from maestro server.
160+
// Subscribe in stub implementation for agent subscribe resource spec.
164161
// Note: It's unnecessary to send a status resync request to agent subscribers.
165-
// The work agent will continuously attempt to send status updates to the gRPC broker.
162+
// The agent will continuously attempt to send status updates to the gRPC broker.
166163
// If the broker is down or disconnected, the agent will resend the status once the broker is back up or reconnected.
167164
func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv1.CloudEventService_SubscribeServer) error {
168165
if len(subReq.ClusterName) == 0 {
@@ -182,7 +179,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
182179
}
183180

184181
// send the cloudevent to the subscriber
185-
klog.V(4).Infof("sending the event to spec subscribers, %s", evt)
182+
klog.Infof("sending the event to spec subscribers, %s", evt)
186183
if err := subServer.Send(pbEvt); err != nil {
187184
klog.Errorf("failed to send grpc event, %v", err)
188185
// Return the error without wrapping, as it includes the gRPC error code and message for further handling.

pkg/cloudevents/server/grpc/server_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ package grpc
33
import (
44
"context"
55
"errors"
6+
"testing"
7+
68
cloudevents "github.com/cloudevents/sdk-go/v2"
79
"google.golang.org/grpc"
810
grpccli "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
911
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1012
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
11-
"testing"
1213
)
1314

1415
var dataType = cetypes.CloudEventsDataType{
@@ -58,14 +59,14 @@ func (s *testService) create(evt *cloudevents.Event) error {
5859
func TestServer(t *testing.T) {
5960
grpcServerOptions := []grpc.ServerOption{}
6061
grpcServer := grpc.NewServer(grpcServerOptions...)
61-
grpcEventServer := NewGRPCBroker(grpcServer, ":8888")
62+
grpcEventServer := NewGRPCBroker(grpcServer)
6263

6364
svc := &testService{evts: make(map[string]*cloudevents.Event)}
6465
grpcEventServer.RegisterService(dataType, svc)
6566

6667
ctx, cancel := context.WithCancel(context.Background())
6768
defer cancel()
68-
go grpcEventServer.Start(ctx)
69+
go grpcEventServer.Start(ctx, ":8888")
6970

7071
grpcClientOptions := grpccli.NewGRPCOptions()
7172
grpcClientOptions.Dialer = &grpccli.GRPCDialer{URL: "localhost:8888"}

pkg/cloudevents/server/interface.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"context"
5+
56
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
67
)
78

@@ -15,7 +16,7 @@ type AgentEventServer interface {
1516
RegisterService(t types.CloudEventsDataType, service Service)
1617

1718
// Start initiates the EventServer to listen to agents.
18-
Start(ctx context.Context)
19+
Start(ctx context.Context, addr string)
1920
}
2021

2122
type EventHandler interface {

0 commit comments

Comments
 (0)