@@ -81,17 +81,20 @@ type ObjectStorageController struct {
81
81
RenewDeadline time.Duration
82
82
RetryPeriod time.Duration
83
83
84
+ eventBroadcaster record.EventBroadcaster
85
+ eventRecorder record.EventRecorder
86
+
84
87
// Controller
85
88
ResyncPeriod time.Duration
86
89
queue workqueue.RateLimitingInterface
87
90
threadiness int
88
91
89
92
// Listeners
90
- BucketListener BucketListener
91
- BucketClaimListener BucketClaimListener
92
- BucketAccessListener BucketAccessListener
93
- BucketClassListener BucketClassListener
94
- BucketAccessClassListener BucketAccessClassListener
93
+ BucketListener BucketListener
94
+ BucketClaimListener BucketClaimListener
95
+ BucketAccessListener BucketAccessListener
96
+ BucketClassListener BucketClassListener
97
+ BucketAccessClassListener BucketAccessClassListener
95
98
96
99
// leader election
97
100
leaderLock string
@@ -148,7 +151,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
148
151
}
149
152
}
150
153
154
+ rb := record .NewBroadcaster ()
155
+ leader := sanitize (fmt .Sprintf ("%s/%s" , leaderLockName , identity ))
156
+
151
157
return & ObjectStorageController {
158
+ eventBroadcaster : rb ,
159
+ eventRecorder : rb .NewRecorder (scheme .Scheme , v1.EventSource {Component : leader }),
160
+
152
161
identity : id ,
153
162
kubeClient : kubeClient ,
154
163
bucketClient : bucketClient ,
@@ -186,31 +195,20 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
186
195
return "default"
187
196
}()
188
197
189
- sanitize := func (n string ) string {
190
- re := regexp .MustCompile ("[^a-zA-Z0-9-]" )
191
- name := strings .ToLower (re .ReplaceAllString (n , "-" ))
192
- if name [len (name )- 1 ] == '-' {
193
- // name must not end with '-'
194
- name = name + "X"
195
- }
196
- return name
197
- }
198
-
199
- leader := sanitize (fmt .Sprintf ("%s/%s" , c .leaderLock , c .identity ))
200
198
id , err := os .Hostname ()
201
199
if err != nil {
202
200
return fmt .Errorf ("error getting the default leader identity: %v" , err )
203
201
}
204
202
205
- recorder := record .NewBroadcaster ()
206
- recorder .StartRecordingToSink (& corev1.EventSinkImpl {Interface : c .kubeClient .CoreV1 ().Events (ns )})
207
- eRecorder := recorder .NewRecorder (scheme .Scheme , v1.EventSource {Component : leader })
203
+ c .eventBroadcaster .StartRecordingToSink (& corev1.EventSinkImpl {Interface : c .kubeClient .CoreV1 ().Events (ns )})
204
+ defer c .eventBroadcaster .Shutdown ()
208
205
209
206
rlConfig := resourcelock.ResourceLockConfig {
210
207
Identity : sanitize (id ),
211
- EventRecorder : eRecorder ,
208
+ EventRecorder : c . eventRecorder ,
212
209
}
213
210
211
+ leader := sanitize (fmt .Sprintf ("%s/%s" , c .leaderLock , c .identity ))
214
212
l , err := resourcelock .New (resourcelock .LeasesResourceLock , ns , leader , c .kubeClient .CoreV1 (), c .kubeClient .CoordinationV1 (), rlConfig )
215
213
if err != nil {
216
214
return err
@@ -423,6 +421,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
423
421
if c .BucketListener != nil {
424
422
c .BucketListener .InitializeKubeClient (c .kubeClient )
425
423
c .BucketListener .InitializeBucketClient (c .bucketClient )
424
+ c .BucketAccessListener .InitializeEventRecorder (c .eventRecorder )
426
425
addFunc := func (ctx context.Context , obj interface {}) error {
427
426
return c .BucketListener .Add (ctx , obj .(* v1alpha1.Bucket ))
428
427
}
@@ -437,6 +436,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
437
436
if c .BucketClaimListener != nil {
438
437
c .BucketClaimListener .InitializeKubeClient (c .kubeClient )
439
438
c .BucketClaimListener .InitializeBucketClient (c .bucketClient )
439
+ c .BucketAccessListener .InitializeEventRecorder (c .eventRecorder )
440
440
addFunc := func (ctx context.Context , obj interface {}) error {
441
441
return c .BucketClaimListener .Add (ctx , obj .(* v1alpha1.BucketClaim ))
442
442
}
@@ -451,6 +451,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
451
451
if c .BucketAccessListener != nil {
452
452
c .BucketAccessListener .InitializeKubeClient (c .kubeClient )
453
453
c .BucketAccessListener .InitializeBucketClient (c .bucketClient )
454
+ c .BucketAccessListener .InitializeEventRecorder (c .eventRecorder )
454
455
addFunc := func (ctx context.Context , obj interface {}) error {
455
456
return c .BucketAccessListener .Add (ctx , obj .(* v1alpha1.BucketAccess ))
456
457
}
@@ -465,6 +466,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
465
466
if c .BucketClassListener != nil {
466
467
c .BucketClassListener .InitializeKubeClient (c .kubeClient )
467
468
c .BucketClassListener .InitializeBucketClient (c .bucketClient )
469
+ c .BucketAccessListener .InitializeEventRecorder (c .eventRecorder )
468
470
addFunc := func (ctx context.Context , obj interface {}) error {
469
471
return c .BucketClassListener .Add (ctx , obj .(* v1alpha1.BucketClass ))
470
472
}
@@ -479,6 +481,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
479
481
if c .BucketAccessClassListener != nil {
480
482
c .BucketAccessClassListener .InitializeKubeClient (c .kubeClient )
481
483
c .BucketAccessClassListener .InitializeBucketClient (c .bucketClient )
484
+ c .BucketAccessListener .InitializeEventRecorder (c .eventRecorder )
482
485
addFunc := func (ctx context.Context , obj interface {}) error {
483
486
return c .BucketAccessClassListener .Add (ctx , obj .(* v1alpha1.BucketAccessClass ))
484
487
}
@@ -493,3 +496,13 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
493
496
494
497
<- ctx .Done ()
495
498
}
499
+
500
+ func sanitize (n string ) string {
501
+ re := regexp .MustCompile ("[^a-zA-Z0-9-]" )
502
+ name := strings .ToLower (re .ReplaceAllString (n , "-" ))
503
+ if name [len (name )- 1 ] == '-' {
504
+ // name must not end with '-'
505
+ name = name + "X"
506
+ }
507
+ return name
508
+ }
0 commit comments