From 96ea28aa772be21ddc277972add0b45ea7102b53 Mon Sep 17 00:00:00 2001 From: Morten Torkildsen Date: Sun, 24 Jan 2021 16:26:47 -0800 Subject: [PATCH] Check if resources implement scale in disruption controller --- cmd/kube-controller-manager/app/core_test.go | 1 + cmd/kube-controller-manager/app/policy.go | 1 + pkg/controller/disruption/BUILD | 2 + pkg/controller/disruption/disruption.go | 31 +++++++ pkg/controller/disruption/disruption_test.go | 82 ++++++++++++++++++- .../integration/disruption/disruption_test.go | 1 + test/integration/evictions/evictions_test.go | 1 + test/integration/scheduler/util.go | 3 +- 8 files changed, 119 insertions(+), 3 deletions(-) diff --git a/cmd/kube-controller-manager/app/core_test.go b/cmd/kube-controller-manager/app/core_test.go index 9fdc70dcb5a..88bd8614d20 100644 --- a/cmd/kube-controller-manager/app/core_test.go +++ b/cmd/kube-controller-manager/app/core_test.go @@ -101,6 +101,7 @@ func TestController_DiscoveryError(t *testing.T) { "GarbageCollectorController": startGarbageCollectorController, "EndpointSliceController": startEndpointSliceController, "EndpointSliceMirroringController": startEndpointSliceMirroringController, + "PodDisruptionBudgetController": startDisruptionController, } tcs := map[string]struct { diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index a3aa4aae210..30acb48ddf0 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -67,6 +67,7 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error client, ctx.RESTMapper, scaleClient, + client.Discovery(), ).Run(ctx.Stop) return nil, true, nil } diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index 26ebc31cb43..10c7beaa228 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -26,6 +26,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library", @@ -64,6 +65,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/discovery/fake:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/scale/fake:go_default_library", diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 11642b422ec..cef8c4993b0 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -19,6 +19,7 @@ package disruption import ( "context" "fmt" + "strings" "time" apps "k8s.io/api/apps/v1beta1" @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" appsv1informers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" policyinformers "k8s.io/client-go/informers/policy/v1beta1" @@ -71,6 +73,7 @@ type DisruptionController struct { mapper apimeta.RESTMapper scaleNamespacer scaleclient.ScalesGetter + discoveryClient discovery.DiscoveryInterface pdbLister policylisters.PodDisruptionBudgetLister pdbListerSynced cache.InformerSynced @@ -121,6 +124,7 @@ func NewDisruptionController( kubeClient clientset.Interface, restMapper apimeta.RESTMapper, scaleNamespacer scaleclient.ScalesGetter, + discoveryClient discovery.DiscoveryInterface, ) *DisruptionController { dc := &DisruptionController{ kubeClient: kubeClient, @@ -164,6 +168,7 @@ func NewDisruptionController( dc.mapper = restMapper dc.scaleNamespacer = scaleNamespacer + dc.discoveryClient = discoveryClient return dc } @@ -294,6 +299,16 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { + // The IsNotFound error can mean either that the resource does not exist, + // or it exist but doesn't implement the scale subresource. We check which + // situation we are facing so we can give an appropriate error message. + isScale, err := dc.implementsScale(gv, controllerRef.Kind) + if err != nil { + return nil, err + } + if !isScale { + return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String()) + } return nil, nil } return nil, err @@ -304,6 +319,22 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil } +func (dc *DisruptionController) implementsScale(gv schema.GroupVersion, kind string) (bool, error) { + resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gv.String()) + if err != nil { + return false, err + } + for _, resource := range resourceList.APIResources { + if resource.Kind != kind { + continue + } + if strings.HasSuffix(resource.Name, "/scale") { + return true, nil + } + } + return false, nil +} + func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) { gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) if err != nil { diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index dbda1fdaece..f0fc0bf8f9f 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" scalefake "k8s.io/client-go/scale/fake" @@ -105,8 +106,9 @@ type disruptionController struct { dStore cache.Store ssStore cache.Store - coreClient *fake.Clientset - scaleClient *scalefake.FakeScaleClient + coreClient *fake.Clientset + scaleClient *scalefake.FakeScaleClient + discoveryClient *discoveryfake.FakeDiscovery } var customGVK = schema.GroupVersionKind{ @@ -124,6 +126,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { scheme := runtime.NewScheme() scheme.AddKnownTypeWithName(customGVK, &v1.Service{}) fakeScaleClient := &scalefake.FakeScaleClient{} + fakeDiscovery := &discoveryfake.FakeDiscovery{ + Fake: &core.Fake{}, + } dc := NewDisruptionController( informerFactory.Core().V1().Pods(), @@ -135,6 +140,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { coreClient, testrestmapper.TestOnlyStaticRESTMapper(scheme), fakeScaleClient, + fakeDiscovery, ) dc.getUpdater = func() updater { return ps.Set } dc.podListerSynced = alwaysReady @@ -157,6 +163,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().StatefulSets().Informer().GetStore(), coreClient, fakeScaleClient, + fakeDiscovery, }, ps } @@ -571,6 +578,77 @@ func TestScaleResource(t *testing.T) { ps.VerifyPdbStatus(t, pdbName, disruptionsAllowed, pods, replicas-maxUnavailable, replicas, map[string]metav1.Time{}) } +func TestScaleFinderNoResource(t *testing.T) { + resourceName := "customresources" + testCases := map[string]struct { + apiResources []metav1.APIResource + expectError bool + }{ + "resource implements scale": { + apiResources: []metav1.APIResource{ + { + Kind: customGVK.Kind, + Name: resourceName, + }, + { + Kind: customGVK.Kind, + Name: resourceName + "/scale", + }, + }, + expectError: false, + }, + "resource does not implement scale": { + apiResources: []metav1.APIResource{ + { + Kind: customGVK.Kind, + Name: resourceName, + }, + }, + expectError: true, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + customResourceUID := uuid.NewUUID() + + dc, _ := newFakeDisruptionController() + + dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) { + gr := schema.GroupResource{ + Group: customGVK.Group, + Resource: resourceName, + } + return true, nil, errors.NewNotFound(gr, "name") + }) + dc.discoveryClient.Resources = []*metav1.APIResourceList{ + { + GroupVersion: customGVK.GroupVersion().String(), + APIResources: tc.apiResources, + }, + } + + trueVal := true + ownerRef := &metav1.OwnerReference{ + Kind: customGVK.Kind, + APIVersion: customGVK.GroupVersion().String(), + Controller: &trueVal, + UID: customResourceUID, + } + + _, err := dc.getScaleController(ownerRef, "default") + + if tc.expectError && err == nil { + t.Error("expected error, but didn't get one") + } + + if !tc.expectError && err != nil { + t.Errorf("did not expect error, but got %v", err) + } + }) + } +} + // Verify that multiple controllers doesn't allow the PDB to be set true. func TestMultipleControllers(t *testing.T) { const podCount = 2 diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index 7e85b4da66d..05ab31b0af7 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -86,6 +86,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti client, mapper, scaleClient, + client.Discovery(), ) return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient } diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 841fbaacbb2..ae78b4676ff 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -356,6 +356,7 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D client, mapper, scaleClient, + client.Discovery(), ) return s, closeFn, rm, informers, clientSet } diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index eff05baabfa..3ec65a98b12 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -69,7 +69,8 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis informers.Apps().V1().StatefulSets(), testCtx.ClientSet, mapper, - scaleClient) + scaleClient, + testCtx.ClientSet.Discovery()) informers.Start(testCtx.Scheduler.StopEverything) informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)