Add verb support to gc and namespace controllers

This commit is contained in:
Dr. Stefan Schimanski
2016-11-17 14:21:12 +01:00
committed by Dr. Stefan Schimanski
parent 458d2b2fe4
commit 24e24fc7bb
8 changed files with 128 additions and 80 deletions

View File

@@ -20,6 +20,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
@@ -28,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
@@ -57,8 +59,8 @@ type NamespaceController struct {
controller *cache.Controller
// namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface
// function to list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error)
// function to list of preferred resources for namespace deletion
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
@@ -69,36 +71,55 @@ type NamespaceController struct {
func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error),
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
resyncPeriod time.Duration,
finalizerToken v1.FinalizerName) *NamespaceController {
// the namespace deletion code looks at the discovery document to enumerate the set of resources on the server.
// it then finds all namespaced resources, and in response to namespace deletion, will call delete on all of them.
// unfortunately, the discovery information does not include the list of supported verbs/methods. if the namespace
// controller calls LIST/DELETECOLLECTION for a resource, it will get a 405 error from the server and cache that that was the case.
// we found in practice though that some auth engines when encountering paths they don't know about may return a 50x.
// until we have verbs, we pre-populate resources that do not support list or delete for well-known apis rather than
// probing the server once in order to be told no.
opCache := &operationNotSupportedCache{
m: make(map[operationKey]bool),
}
ignoredGroupVersionResources := []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "bindings"},
// pre-fill opCache with the discovery info
//
// TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info
resources, err := discoverResourcesFn()
if err != nil {
glog.Fatalf("Failed to get supported resources: %v", err)
}
for _, ignoredGroupVersionResource := range ignoredGroupVersionResources {
opCache.setNotSupported(operationKey{op: operationDeleteCollection, gvr: ignoredGroupVersionResource})
opCache.setNotSupported(operationKey{op: operationList, gvr: ignoredGroupVersionResource})
deletableGroupVersionResources := []schema.GroupVersionResource{}
for _, rl := range resources {
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
if err != nil {
glog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err)
continue
}
for _, r := range rl.APIResources {
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: r.Name}
verbs := sets.NewString([]string(r.Verbs)...)
if !verbs.Has("delete") {
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
}
for _, op := range []operation{operationList, operationDeleteCollection} {
if !verbs.Has(string(op)) {
opCache.setNotSupported(operationKey{op: op, gvr: gvr})
}
}
deletableGroupVersionResources = append(deletableGroupVersionResources, gvr)
}
}
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResourcesFn: groupVersionResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
discoverResourcesFn: discoverResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
@@ -203,7 +224,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err
}
namespace := obj.(*v1.Namespace)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken)
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.discoverResourcesFn, namespace, nm.finalizerToken)
}
// Run starts observing the system with the specified number of workers.

View File

@@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
@@ -113,8 +114,9 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
// when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default
dynamicClientActionSet := sets.NewString()
groupVersionResources := testGroupVersionResources()
for _, groupVersionResource := range groupVersionResources {
resources := testResources()
groupVersionResources, _ := discovery.GroupVersionResources(resources)
for groupVersionResource := range groupVersionResources {
urlPath := path.Join([]string{
dynamic.LegacyAPIPathResolverFunc(schema.GroupVersionKind{Group: groupVersionResource.Group, Version: groupVersionResource.Version}),
groupVersionResource.Group,
@@ -170,8 +172,8 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
fn := func() ([]schema.GroupVersionResource, error) {
return groupVersionResources, nil
fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil
}
err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, v1.FinalizerKubernetes)
@@ -243,8 +245,8 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: v1.NamespaceActive,
},
}
fn := func() ([]schema.GroupVersionResource, error) {
return testGroupVersionResources(), nil
fn := func() ([]*metav1.APIResourceList, error) {
return testResources(), nil
}
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes)
if err != nil {
@@ -295,11 +297,37 @@ func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *htt
response.Write([]byte("{\"kind\": \"List\",\"items\":null}"))
}
// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller.
func testGroupVersionResources() []schema.GroupVersionResource {
results := []schema.GroupVersionResource{}
results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"})
results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"})
results = append(results, schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"})
// testResources returns a mocked up set of resources across different api groups for testing namespace controller.
func testResources() []*metav1.APIResourceList {
results := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{
Name: "pods",
Namespaced: true,
Kind: "Pod",
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
},
{
Name: "services",
Namespaced: true,
Kind: "Service",
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
},
},
},
{
GroupVersion: "extensions/v1beta1",
APIResources: []metav1.APIResource{
{
Name: "deployments",
Namespaced: true,
Kind: "Deployment",
Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"},
},
},
},
}
return results
}

View File

@@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
@@ -367,7 +368,7 @@ func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache *operationNotSupportedCache,
groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error),
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespace *v1.Namespace,
finalizerToken v1.FinalizerName,
) error {
@@ -418,7 +419,13 @@ func syncNamespace(
}
// there may still be content for us to remove
groupVersionResources, err := groupVersionResourcesFn()
resources, err := discoverResourcesFn()
if err != nil {
return err
}
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
return err
}