Switch the namespace controller to use the metadata client

The metadata client uses protobuf and returns only a subset of object
data (the metadata) which allows operations that act only on objects
generically to work much faster. Use the metadata client in the
namespace controller to reduce the amount of work the namespace controller
has to do in large namespaces.
This commit is contained in:
Clayton Coleman 2019-06-05 15:19:55 -04:00
parent bc89c37f32
commit 50fd47258d
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
10 changed files with 49 additions and 46 deletions

View File

@ -124,6 +124,7 @@ go_library(
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",

View File

@ -29,12 +29,13 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/controller"
cloudcontroller "k8s.io/kubernetes/pkg/controller/cloud"
@ -371,7 +372,7 @@ func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error)
func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (http.Handler, bool, error) {
dynamicClient, err := dynamic.NewForConfig(nsKubeconfig)
metadataClient, err := metadata.NewForConfig(nsKubeconfig)
if err != nil {
return nil, true, err
}
@ -380,7 +381,7 @@ func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient
namespaceController := namespacecontroller.NewNamespaceController(
namespaceKubeClient,
dynamicClient,
metadataClient,
discoverResourcesFn,
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,

View File

@ -149,6 +149,7 @@
"k8s.io/client-go/listers/policy/v1beta1",
"k8s.io/client-go/listers/rbac/v1",
"k8s.io/client-go/listers/storage/v1",
"k8s.io/client-go/metadata",
"k8s.io/client-go/pkg/version",
"k8s.io/client-go/rest",
"k8s.io/client-go/scale",

View File

@ -21,10 +21,10 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -14,14 +14,13 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -41,6 +40,7 @@ go_test(
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
],

View File

@ -24,17 +24,16 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
v1clientset "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
)
// NamespacedResourcesDeleterInterface is the interface to delete a namespace with all resources in it.
@ -44,13 +43,13 @@ type NamespacedResourcesDeleterInterface interface {
// NewNamespacedResourcesDeleter returns a new NamespacedResourcesDeleter.
func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
dynamicClient dynamic.Interface, podsGetter v1clientset.PodsGetter,
metadataClient metadata.Interface, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{
nsClient: nsClient,
dynamicClient: dynamicClient,
podsGetter: podsGetter,
nsClient: nsClient,
metadataClient: metadataClient,
podsGetter: podsGetter,
opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool),
},
@ -69,7 +68,7 @@ type namespacedResourcesDeleter struct {
// Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources.
dynamicClient dynamic.Interface
metadataClient metadata.Interface
// Interface to get PodInterface.
podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource.
@ -345,7 +344,7 @@ func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionRes
// namespace itself.
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
err := d.dynamicClient.Resource(gvr).Namespace(namespace).DeleteCollection(opts, metav1.ListOptions{})
err := d.metadataClient.Resource(gvr).Namespace(namespace).DeleteCollection(opts, metav1.ListOptions{})
if err == nil {
return true, nil
@ -372,7 +371,7 @@ func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionRes
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*metav1.PartialObjectMetadataList, bool, error) {
klog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{operation: operationList, gvr: gvr}
@ -381,9 +380,9 @@ func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResou
return nil, false, nil
}
unstructuredList, err := d.dynamicClient.Resource(gvr).Namespace(namespace).List(metav1.ListOptions{})
partialList, err := d.metadataClient.Resource(gvr).Namespace(namespace).List(metav1.ListOptions{})
if err == nil {
return unstructuredList, true, nil
return partialList, true, nil
}
// this is strange, but we need to special case for both MethodNotSupported and NotFound errors
@ -415,7 +414,7 @@ func (d *namespacedResourcesDeleter) deleteEachItem(gvr schema.GroupVersionResou
for _, item := range unstructuredList.Items {
background := metav1.DeletePropagationBackground
opts := &metav1.DeleteOptions{PropagationPolicy: &background}
if err = d.dynamicClient.Resource(gvr).Namespace(namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
if err = d.metadataClient.Resource(gvr).Namespace(namespace).Delete(item.GetName(), opts); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err
}
}

View File

@ -25,7 +25,7 @@ import (
"sync"
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -34,6 +34,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/metadata"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
api "k8s.io/kubernetes/pkg/apis/core"
@ -115,7 +116,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
}
// when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default
dynamicClientActionSet := sets.NewString()
metadataClientActionSet := sets.NewString()
resources := testResources()
groupVersionResources, _ := discovery.GroupVersionResources(resources)
for groupVersionResource := range groupVersionResources {
@ -127,15 +128,15 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
namespaceName,
groupVersionResource.Resource,
}...)
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
metadataClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
metadataClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
}
scenarios := map[string]struct {
testNamespace *v1.Namespace
kubeClientActionSet sets.String
dynamicClientActionSet sets.String
gvrError error
testNamespace *v1.Namespace
kubeClientActionSet sets.String
metadataClientActionSet sets.String
gvrError error
}{
"pending-finalize": {
testNamespace: testNamespacePendingFinalize,
@ -145,7 +146,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: dynamicClientActionSet,
metadataClientActionSet: metadataClientActionSet,
},
"complete-finalize": {
testNamespace: testNamespaceFinalizeComplete,
@ -153,7 +154,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: sets.NewString(),
metadataClientActionSet: sets.NewString(),
},
"groupVersionResourceErr": {
testNamespace: testNamespaceFinalizeComplete,
@ -161,8 +162,8 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: sets.NewString(),
gvrError: fmt.Errorf("test error"),
metadataClientActionSet: sets.NewString(),
gvrError: fmt.Errorf("test error"),
},
}
@ -172,7 +173,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
defer srv.Close()
mockClient := fake.NewSimpleClientset(testInput.testNamespace)
dynamicClient, err := dynamic.NewForConfig(clientConfig)
metadataClient, err := metadata.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}
@ -180,7 +181,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil
}
d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), dynamicClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes, true)
d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), metadataClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes, true)
if err := d.Delete(testInput.testNamespace.Name); err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
}
@ -195,14 +196,14 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet))
}
// validate traffic from dynamic client
// validate traffic from metadata client
actionSet = sets.NewString()
for _, action := range testHandler.actions {
actionSet.Insert(action.String())
}
if !actionSet.Equal(testInput.dynamicClientActionSet) {
t.Errorf("scenario %s - dynamic client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.dynamicClientActionSet, actionSet, testInput.dynamicClientActionSet.Difference(actionSet))
if !actionSet.Equal(testInput.metadataClientActionSet) {
t.Errorf("scenario %s - metadata client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.metadataClientActionSet, actionSet, testInput.metadataClientActionSet.Difference(actionSet))
}
}
}
@ -307,7 +308,7 @@ func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *htt
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path})
response.Header().Set("Content-Type", runtime.ContentTypeJSON)
response.WriteHeader(f.statusCode)
response.Write([]byte("{\"kind\": \"List\",\"items\":null}"))
response.Write([]byte("{\"apiVersion\": \"v1\", \"kind\": \"List\",\"items\":null}"))
}
// testResources returns a mocked up set of resources across different api groups for testing namespace controller.

View File

@ -20,15 +20,15 @@ import (
"fmt"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
@ -63,7 +63,7 @@ type NamespaceController struct {
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
kubeClient clientset.Interface,
dynamicClient dynamic.Interface,
metadataClient metadata.Interface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
namespaceInformer coreinformers.NamespaceInformer,
resyncPeriod time.Duration,
@ -72,7 +72,7 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), dynamicClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true),
}
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {

View File

@ -31,9 +31,9 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/metadata:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/kubelet/config/v1beta1:go_default_library",

View File

@ -20,9 +20,9 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
restclient "k8s.io/client-go/rest"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
)
@ -59,7 +59,7 @@ func (n *NamespaceController) Start() error {
if err != nil {
return err
}
dynamicClient, err := dynamic.NewForConfig(config)
metadataClient, err := metadata.NewForConfig(config)
if err != nil {
return err
}
@ -67,7 +67,7 @@ func (n *NamespaceController) Start() error {
informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod)
nc := namespacecontroller.NewNamespaceController(
client,
dynamicClient,
metadataClient,
discoverResourcesFn,
informerFactory.Core().V1().Namespaces(),
ncResyncPeriod, v1.FinalizerKubernetes,