Skip to content

Commit 24e24fc

Browse files
committed
Add verb support to gc and namespace controllers
1 parent 458d2b2 commit 24e24fc

File tree

8 files changed

+128
-80
lines changed

8 files changed

+128
-80
lines changed

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
"k8s.io/kubernetes/pkg/api/v1"
3737
"k8s.io/kubernetes/pkg/apimachinery/registered"
3838
"k8s.io/kubernetes/pkg/apis/batch"
39+
"k8s.io/kubernetes/pkg/apis/extensions"
40+
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
3941
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
4042
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
4143
"k8s.io/kubernetes/pkg/client/leaderelection"
@@ -393,44 +395,26 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont
393395
namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller")
394396
namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc)
395397
// TODO: consider using a list-watch + cache here rather than polling
396-
gvrFn := func() (map[schema.GroupVersionResource]struct{}, error) {
397-
resources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
398-
if err != nil {
399-
// best effort extraction
400-
gvrs, _ := discovery.GroupVersionResources(resources)
401-
return gvrs, fmt.Errorf("failed to get supported namespaced resources: %v", err)
402-
}
403-
gvrs, err := discovery.GroupVersionResources(resources)
404-
if err != nil {
405-
return gvrs, fmt.Errorf("failed to parse supported namespaced resources: %v", err)
406-
}
407-
return gvrs, nil
398+
resources, err := namespaceKubeClient.Discovery().ServerResources()
399+
if err != nil {
400+
return fmt.Errorf("failed to get preferred server resources: %v", err)
408401
}
409-
rsrcs, err := namespaceKubeClient.Discovery().ServerResources()
402+
gvrs, err := discovery.GroupVersionResources(resources)
410403
if err != nil {
411-
return fmt.Errorf("failed to get group version resources: %v", err)
412-
}
413-
tprFound := false
414-
searchThirdPartyResource:
415-
for _, rsrcList := range rsrcs {
416-
for ix := range rsrcList.APIResources {
417-
rsrc := &rsrcList.APIResources[ix]
418-
if rsrc.Kind == "ThirdPartyResource" {
419-
tprFound = true
420-
break searchThirdPartyResource
421-
}
422-
}
404+
return fmt.Errorf("failed to parse preferred server resources: %v", err)
423405
}
424-
if !tprFound {
425-
gvr, err := gvrFn()
406+
discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
407+
if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found {
408+
// make discovery static
409+
snapshot, err := discoverResourcesFn()
426410
if err != nil {
427-
return fmt.Errorf("failed to get resources: %v", err)
411+
return fmt.Errorf("failed to get server resources: %v", err)
428412
}
429-
gvrFn = func() (map[schema.GroupVersionResource]struct{}, error) {
430-
return gvr, nil
413+
discoverResourcesFn = func() ([]*metav1.APIResourceList, error) {
414+
return snapshot, nil
431415
}
432416
}
433-
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
417+
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes)
434418
go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop)
435419
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
436420

@@ -567,17 +551,18 @@ searchThirdPartyResource:
567551
if err != nil {
568552
return fmt.Errorf("failed to get supported resources from server: %v", err)
569553
}
570-
groupVersionResources, err := discovery.GroupVersionResources(preferredResources)
554+
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
555+
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
571556
if err != nil {
572-
glog.Fatalf("Failed to parse supported resources from server: %v", err)
557+
glog.Errorf("Failed to parse resources from server: %v", err)
573558
}
574559

575560
config := rootClientBuilder.ConfigOrDie("generic-garbage-collector")
576561
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
577562
metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
578563
config.ContentConfig = dynamic.ContentConfig()
579564
clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc)
580-
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, groupVersionResources)
565+
garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources)
581566
if err != nil {
582567
glog.Errorf("Failed to start the generic garbage collector: %v", err)
583568
} else {

pkg/controller/garbagecollector/garbagecollector.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,16 +537,16 @@ var ignoredResources = map[schema.GroupVersionResource]struct{}{
537537
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
538538
}
539539

540-
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, resources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
540+
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) {
541541
gc := &GarbageCollector{
542542
metaOnlyClientPool: metaOnlyClientPool,
543543
clientPool: clientPool,
544544
restMapper: mapper,
545545
clock: clock.RealClock{},
546546
dirtyQueue: workqueue.NewTimedWorkQueue(),
547547
orphanQueue: workqueue.NewTimedWorkQueue(),
548-
registeredRateLimiter: NewRegisteredRateLimiter(resources),
549-
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
548+
registeredRateLimiter: NewRegisteredRateLimiter(deletableResources),
549+
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(deletableResources),
550550
absentOwnerCache: NewUIDCache(500),
551551
}
552552
gc.propagator = &Propagator{
@@ -557,7 +557,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
557557
},
558558
gc: gc,
559559
}
560-
for resource := range resources {
560+
for resource := range deletableResources {
561561
if _, ok := ignoredResources[resource]; ok {
562562
glog.V(6).Infof("ignore resource %#v", resource)
563563
continue

pkg/controller/garbagecollector/garbagecollector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestNewGarbageCollector(t *testing.T) {
4949
metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
5050
config.ContentConfig.NegotiatedSerializer = nil
5151
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
52-
podResource := []schema.GroupVersionResource{{Version: "v1", Resource: "pods"}}
52+
podResource := map[schema.GroupVersionResource]struct{}{schema.GroupVersionResource{Version: "v1", Resource: "pods"}: {}}
5353
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource)
5454
if err != nil {
5555
t.Fatal(err)
@@ -113,7 +113,7 @@ func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
113113
metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
114114
config.ContentConfig.NegotiatedSerializer = nil
115115
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
116-
podResource := []schema.GroupVersionResource{{Version: "v1", Resource: "pods"}}
116+
podResource := map[schema.GroupVersionResource]struct{}{schema.GroupVersionResource{Version: "v1", Resource: "pods"}: {}}
117117
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource)
118118
if err != nil {
119119
t.Fatal(err)

pkg/controller/namespace/namespace_controller.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"k8s.io/kubernetes/pkg/api/v1"
23+
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
2324
"k8s.io/kubernetes/pkg/client/cache"
2425
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
2526
"k8s.io/kubernetes/pkg/client/typed/dynamic"
@@ -28,6 +29,7 @@ import (
2829
"k8s.io/kubernetes/pkg/runtime/schema"
2930
"k8s.io/kubernetes/pkg/util/metrics"
3031
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
32+
"k8s.io/kubernetes/pkg/util/sets"
3133
"k8s.io/kubernetes/pkg/util/wait"
3234
"k8s.io/kubernetes/pkg/util/workqueue"
3335
"k8s.io/kubernetes/pkg/watch"
@@ -57,8 +59,8 @@ type NamespaceController struct {
5759
controller *cache.Controller
5860
// namespaces that have been queued up for processing by workers
5961
queue workqueue.RateLimitingInterface
60-
// function to list of preferred group versions and their corresponding resource set for namespace deletion
61-
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error)
62+
// function to list of preferred resources for namespace deletion
63+
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
6264
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
6365
opCache *operationNotSupportedCache
6466
// finalizerToken is the finalizer token managed by this controller
@@ -69,36 +71,55 @@ type NamespaceController struct {
6971
func NewNamespaceController(
7072
kubeClient clientset.Interface,
7173
clientPool dynamic.ClientPool,
72-
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error),
74+
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
7375
resyncPeriod time.Duration,
7476
finalizerToken v1.FinalizerName) *NamespaceController {
7577

76-
// the namespace deletion code looks at the discovery document to enumerate the set of resources on the server.
77-
// it then finds all namespaced resources, and in response to namespace deletion, will call delete on all of them.
78-
// unfortunately, the discovery information does not include the list of supported verbs/methods. if the namespace
79-
// controller calls LIST/DELETECOLLECTION for a resource, it will get a 405 error from the server and cache that that was the case.
80-
// we found in practice though that some auth engines when encountering paths they don't know about may return a 50x.
81-
// until we have verbs, we pre-populate resources that do not support list or delete for well-known apis rather than
82-
// probing the server once in order to be told no.
8378
opCache := &operationNotSupportedCache{
8479
m: make(map[operationKey]bool),
8580
}
86-
ignoredGroupVersionResources := []schema.GroupVersionResource{
87-
{Group: "", Version: "v1", Resource: "bindings"},
81+
82+
// pre-fill opCache with the discovery info
83+
//
84+
// TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info
85+
resources, err := discoverResourcesFn()
86+
if err != nil {
87+
glog.Fatalf("Failed to get supported resources: %v", err)
8888
}
89-
for _, ignoredGroupVersionResource := range ignoredGroupVersionResources {
90-
opCache.setNotSupported(operationKey{op: operationDeleteCollection, gvr: ignoredGroupVersionResource})
91-
opCache.setNotSupported(operationKey{op: operationList, gvr: ignoredGroupVersionResource})
89+
deletableGroupVersionResources := []schema.GroupVersionResource{}
90+
for _, rl := range resources {
91+
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
92+
if err != nil {
93+
glog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err)
94+
continue
95+
}
96+
97+
for _, r := range rl.APIResources {
98+
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: r.Name}
99+
verbs := sets.NewString([]string(r.Verbs)...)
100+
101+
if !verbs.Has("delete") {
102+
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
103+
}
104+
105+
for _, op := range []operation{operationList, operationDeleteCollection} {
106+
if !verbs.Has(string(op)) {
107+
opCache.setNotSupported(operationKey{op: op, gvr: gvr})
108+
}
109+
}
110+
111+
deletableGroupVersionResources = append(deletableGroupVersionResources, gvr)
112+
}
92113
}
93114

94115
// create the controller so we can inject the enqueue function
95116
namespaceController := &NamespaceController{
96-
kubeClient: kubeClient,
97-
clientPool: clientPool,
98-
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
99-
groupVersionResourcesFn: groupVersionResourcesFn,
100-
opCache: opCache,
101-
finalizerToken: finalizerToken,
117+
kubeClient: kubeClient,
118+
clientPool: clientPool,
119+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
120+
discoverResourcesFn: discoverResourcesFn,
121+
opCache: opCache,
122+
finalizerToken: finalizerToken,
102123
}
103124

104125
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
@@ -203,7 +224,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
203224
return err
204225
}
205226
namespace := obj.(*v1.Namespace)
206-
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
227+
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.discoverResourcesFn, namespace, nm.finalizerToken)
207228
}
208229

209230
// Run starts observing the system with the specified number of workers.

pkg/controller/namespace/namespace_controller_test.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
3535
"k8s.io/kubernetes/pkg/client/restclient"
3636
"k8s.io/kubernetes/pkg/client/testing/core"
37+
"k8s.io/kubernetes/pkg/client/typed/discovery"
3738
"k8s.io/kubernetes/pkg/client/typed/dynamic"
3839
"k8s.io/kubernetes/pkg/runtime"
3940
"k8s.io/kubernetes/pkg/runtime/schema"
@@ -113,8 +114,9 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
113114

114115
// when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default
115116
dynamicClientActionSet := sets.NewString()
116-
groupVersionResources := testGroupVersionResources()
117-
for _, groupVersionResource := range groupVersionResources {
117+
resources := testResources()
118+
groupVersionResources, _ := discovery.GroupVersionResources(resources)
119+
for groupVersionResource := range groupVersionResources {
118120
urlPath := path.Join([]string{
119121
dynamic.LegacyAPIPathResolverFunc(schema.GroupVersionKind{Group: groupVersionResource.Group, Version: groupVersionResource.Version}),
120122
groupVersionResource.Group,
@@ -170,8 +172,8 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
170172
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
171173
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
172174

173-
fn := func() ([]schema.GroupVersionResource, error) {
174-
return groupVersionResources, nil
175+
fn := func() ([]*metav1.APIResourceList, error) {
176+
return resources, nil
175177
}
176178

177179
err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, v1.FinalizerKubernetes)
@@ -243,8 +245,8 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
243245
Phase: v1.NamespaceActive,
244246
},
245247
}
246-
fn := func() ([]schema.GroupVersionResource, error) {
247-
return testGroupVersionResources(), nil
248+
fn := func() ([]*metav1.APIResourceList, error) {
249+
return testResources(), nil
248250
}
249251
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes)
250252
if err != nil {
@@ -295,11 +297,37 @@ func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *htt
295297
response.Write([]byte("{\"kind\": \"List\",\"items\":null}"))
296298
}
297299

298-
// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller.
299-
func testGroupVersionResources() []schema.GroupVersionResource {
300-
results := []schema.GroupVersionResource{}
301-
results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"})
302-
results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"})
303-
results = append(results, schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"})
300+
// testResources returns a mocked up set of resources across different api groups for testing namespace controller.
301+
func testResources() []*metav1.APIResourceList {
302+
results := []*metav1.APIResourceList{
303+
{
304+
GroupVersion: "v1",
305+
APIResources: []metav1.APIResource{
306+
{
307+
Name: "pods",
308+
Namespaced: true,
309+
Kind: "Pod",
310+
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
311+
},
312+
{
313+
Name: "services",
314+
Namespaced: true,
315+
Kind: "Service",
316+
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
317+
},
318+
},
319+
},
320+
{
321+
GroupVersion: "extensions/v1beta1",
322+
APIResources: []metav1.APIResource{
323+
{
324+
Name: "deployments",
325+
Namespaced: true,
326+
Kind: "Deployment",
327+
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
328+
},
329+
},
330+
},
331+
}
304332
return results
305333
}

pkg/controller/namespace/namespace_controller_utils.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/kubernetes/pkg/api/v1"
2626
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
2727
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
28+
"k8s.io/kubernetes/pkg/client/typed/discovery"
2829
"k8s.io/kubernetes/pkg/client/typed/dynamic"
2930
"k8s.io/kubernetes/pkg/runtime"
3031
"k8s.io/kubernetes/pkg/runtime/schema"
@@ -367,7 +368,7 @@ func syncNamespace(
367368
kubeClient clientset.Interface,
368369
clientPool dynamic.ClientPool,
369370
opCache *operationNotSupportedCache,
370-
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error),
371+
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
371372
namespace *v1.Namespace,
372373
finalizerToken v1.FinalizerName,
373374
) error {
@@ -418,7 +419,13 @@ func syncNamespace(
418419
}
419420

420421
// there may still be content for us to remove
421-
groupVersionResources, err := groupVersionResourcesFn()
422+
resources, err := discoverResourcesFn()
423+
if err != nil {
424+
return err
425+
}
426+
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
427+
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
428+
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
422429
if err != nil {
423430
return err
424431
}

test/e2e_node/services/namespace_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ func (n *NamespaceController) Start() error {
5656
return err
5757
}
5858
clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
59-
gvrFn := client.Discovery().ServerPreferredNamespacedResources
60-
nc := namespacecontroller.NewNamespaceController(client, clientPool, gvrFn, ncResyncPeriod, v1.FinalizerKubernetes)
59+
discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources
60+
nc := namespacecontroller.NewNamespaceController(client, clientPool, discoverResourcesFn, ncResyncPeriod, v1.FinalizerKubernetes)
6161
go nc.Run(ncConcurrency, n.stopCh)
6262
return nil
6363
}

0 commit comments

Comments
 (0)