Skip to content

Commit 5257c63

Browse files
authored
Merge pull request #74 from ecordell/lockfix
Fix `manager` not releasing read lock
2 parents f26fe5d + cdd2972 commit 5257c63

File tree

3 files changed

+140
-9
lines changed

3 files changed

+140
-9
lines changed

manager/controller_test.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ func ExampleNewOwnedResourceController() {
4343
mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":", broadcaster, eventSink)
4444
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
4545
defer cancel()
46-
_ = mgr.Start(ctx, controller)
46+
readyc := make(chan struct{})
47+
_ = mgr.Start(ctx, readyc, controller)
48+
<-readyc
4749
// Output:
4850
}
4951

@@ -65,9 +67,12 @@ func TestControllerQueueDone(t *testing.T) {
6567
mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":", broadcaster, eventSink)
6668
ctx, cancel := context.WithCancel(context.Background())
6769
defer cancel()
70+
71+
readyc := make(chan struct{})
6872
go func() {
69-
_ = mgr.Start(ctx, controller)
73+
_ = mgr.Start(ctx, readyc, controller)
7074
}()
75+
<-readyc
7176

7277
// add many keys
7378
for i := 0; i < 10; i++ {
@@ -105,9 +110,11 @@ func TestControllerEventsBroadcast(t *testing.T) {
105110
mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":8888", broadcaster, eventSink)
106111
ctx, cancel := context.WithCancel(context.Background())
107112
defer cancel()
113+
readyc := make(chan struct{})
108114
go func() {
109-
_ = mgr.Start(ctx, controller)
115+
_ = mgr.Start(ctx, readyc, controller)
110116
}()
117+
<-readyc
111118
require.Eventually(t, healthCheckPassing(), 1*time.Second, 50*time.Millisecond)
112119

113120
recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test")

manager/manager.go

+37-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package manager
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/http"
78
"runtime"
@@ -45,6 +46,9 @@ type Manager struct {
4546
// information for its managed set of controllers.
4647
func NewManager(debugConfig *componentconfig.DebuggingConfiguration, address string, broadcaster record.EventBroadcaster, sink record.EventSink) *Manager {
4748
handler := healthz.NewMutableHealthzHandler()
49+
if broadcaster == nil {
50+
broadcaster = record.NewBroadcaster()
51+
}
4852
return &Manager{
4953
healthzHandler: handler,
5054
srv: &http.Server{
@@ -61,13 +65,15 @@ func NewManager(debugConfig *componentconfig.DebuggingConfiguration, address str
6165
// Start starts a set of controllers in an errgroup and serves
6266
// health / debug endpoints for them. It stops when the context is cancelled.
6367
// It will only have an effect the first time it is called.
64-
func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {
68+
func (m *Manager) Start(ctx context.Context, readyc chan<- struct{}, controllers ...Controller) error {
6569
m.RLock()
6670
if m.errG != nil {
71+
m.RUnlock()
6772
return fmt.Errorf("manager already started")
6873
}
6974
m.RUnlock()
7075

76+
var startErr error
7177
m.once.Do(func() {
7278
m.Lock()
7379
m.errG, ctx = errgroup.WithContext(ctx)
@@ -76,40 +82,65 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {
7682

7783
// start controllers
7884
if err := m.Go(controllers...); err != nil {
85+
startErr = err
7986
return
8087
}
8188

8289
// start health / debug server
8390
m.errG.Go(func() error {
84-
return m.srv.ListenAndServe()
91+
if err := m.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
92+
return err
93+
}
94+
return nil
8595
})
8696

8797
// start broadcaster
8898
m.errG.Go(func() error {
8999
m.broadcaster.StartStructuredLogging(2)
90-
m.broadcaster.StartRecordingToSink(m.sink)
100+
if m.sink != nil {
101+
m.broadcaster.StartRecordingToSink(m.sink)
102+
}
91103
return nil
92104
})
93105

94-
// stop health / debug server when context is cancelled
106+
// stop health / debug server and all controllers when context is
107+
// cancelled
95108
m.errG.Go(func() error {
96109
<-ctx.Done()
97110
m.broadcaster.Shutdown()
98-
return m.srv.Shutdown(ctx)
111+
112+
m.Lock()
113+
for ctrl, cancel := range m.cancelFuncs {
114+
cancel()
115+
delete(m.cancelFuncs, ctrl)
116+
}
117+
m.Unlock()
118+
119+
// no context passed to shutdown; the errg will block
120+
// until the server is closed
121+
return m.srv.Shutdown(context.Background())
99122
})
100123
})
124+
125+
close(readyc)
126+
127+
if startErr != nil {
128+
return startErr
129+
}
130+
101131
if err := m.errG.Wait(); err != nil {
102132
return err
103133
}
104134

105-
return ctx.Err()
135+
return nil
106136
}
107137

108138
// Go adds controllers into the existing manager's errgroup
109139
func (m *Manager) Go(controllers ...Controller) error {
110140
m.RLock()
111141
errG := m.errG
112142
if errG == nil {
143+
m.RUnlock()
113144
return fmt.Errorf("cannot add controllers to an unstarted manager")
114145
}
115146
ctx := m.errGCtx

manager/manager_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package manager
2+
3+
import (
4+
"context"
5+
"net"
6+
"strconv"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
"k8s.io/apimachinery/pkg/runtime/schema"
12+
"k8s.io/client-go/tools/record"
13+
"k8s.io/component-base/config"
14+
"k8s.io/klog/v2/klogr"
15+
16+
"github.com/authzed/controller-idioms/queue"
17+
"github.com/authzed/controller-idioms/typed"
18+
)
19+
20+
func TestManager(t *testing.T) {
21+
t.Parallel()
22+
ctx, cancel := context.WithCancel(context.Background())
23+
24+
m := NewManager(&config.DebuggingConfiguration{
25+
EnableProfiling: false,
26+
EnableContentionProfiling: false,
27+
}, ":"+getFreePort(t), nil, nil)
28+
29+
ready := make(chan struct{})
30+
go func() {
31+
require.NoError(t, m.Start(ctx, ready, testController(t, "a")))
32+
}()
33+
<-ready
34+
35+
requireCancelFnCount(t, m, 1)
36+
37+
// Ensure that the manager can't be started twice.
38+
require.Error(t, m.Start(ctx, ready), "manager already started")
39+
40+
// Add some controllers after start
41+
require.NoError(t, m.Go(testController(t, "b"), testController(t, "c")))
42+
requireCancelFnCount(t, m, 3)
43+
44+
specificCtrl := testController(t, "d")
45+
require.NoError(t, m.Go(specificCtrl))
46+
requireCancelFnCount(t, m, 4)
47+
48+
// stop a specific controller
49+
m.Cancel(specificCtrl)
50+
requireCancelFnCount(t, m, 3)
51+
52+
// cancel the manager's context, which will clean up all controllers
53+
cancel()
54+
55+
requireCancelFnCount(t, m, 0)
56+
}
57+
58+
func getFreePort(t testing.TB) string {
59+
t.Helper()
60+
61+
a, err := net.ResolveTCPAddr("tcp", "localhost:0")
62+
require.NoError(t, err)
63+
l, err := net.ListenTCP("tcp", a)
64+
require.NoError(t, err)
65+
defer func() {
66+
require.NoError(t, l.Close())
67+
}()
68+
return strconv.Itoa(l.Addr().(*net.TCPAddr).Port)
69+
}
70+
71+
func testController(t *testing.T, name string) Controller {
72+
gvr := schema.GroupVersionResource{
73+
Group: "example.com",
74+
Version: "v1",
75+
Resource: "mytypes",
76+
}
77+
CtxQueue := queue.NewQueueOperationsCtx()
78+
registry := typed.NewRegistry()
79+
broadcaster := record.NewBroadcaster()
80+
81+
return NewOwnedResourceController(klogr.New(), name, gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
82+
t.Log("processing", gvr, namespace, name)
83+
})
84+
}
85+
86+
func requireCancelFnCount(t *testing.T, m *Manager, count int) {
87+
t.Helper()
88+
require.Eventually(t, func() bool {
89+
m.RLock()
90+
defer m.RUnlock()
91+
return len(m.cancelFuncs) == count
92+
}, 100*time.Second, 10*time.Millisecond)
93+
}

0 commit comments

Comments
 (0)