Check if resources implement scale in disruption controller

This commit is contained in:
Morten Torkildsen 2021-01-24 16:26:47 -08:00
parent 3433f099fb
commit 96ea28aa77
8 changed files with 119 additions and 3 deletions

View File

@ -101,6 +101,7 @@ func TestController_DiscoveryError(t *testing.T) {
"GarbageCollectorController": startGarbageCollectorController,
"EndpointSliceController": startEndpointSliceController,
"EndpointSliceMirroringController": startEndpointSliceMirroringController,
"PodDisruptionBudgetController": startDisruptionController,
}
tcs := map[string]struct {

View File

@ -67,6 +67,7 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error
client,
ctx.RESTMapper,
scaleClient,
client.Discovery(),
).Run(ctx.Stop)
return nil, true, nil
}

View File

@ -26,6 +26,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
@ -64,6 +65,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/fake:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",

View File

@ -19,6 +19,7 @@ package disruption
import (
"context"
"fmt"
"strings"
"time"
apps "k8s.io/api/apps/v1beta1"
@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
appsv1informers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
@ -71,6 +73,7 @@ type DisruptionController struct {
mapper apimeta.RESTMapper
scaleNamespacer scaleclient.ScalesGetter
discoveryClient discovery.DiscoveryInterface
pdbLister policylisters.PodDisruptionBudgetLister
pdbListerSynced cache.InformerSynced
@ -121,6 +124,7 @@ func NewDisruptionController(
kubeClient clientset.Interface,
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
discoveryClient discovery.DiscoveryInterface,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
@ -164,6 +168,7 @@ func NewDisruptionController(
dc.mapper = restMapper
dc.scaleNamespacer = scaleNamespacer
dc.discoveryClient = discoveryClient
return dc
}
@ -294,6 +299,16 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// The IsNotFound error can mean either that the resource does not exist,
// or it exist but doesn't implement the scale subresource. We check which
// situation we are facing so we can give an appropriate error message.
isScale, err := dc.implementsScale(gv, controllerRef.Kind)
if err != nil {
return nil, err
}
if !isScale {
return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String())
}
return nil, nil
}
return nil, err
@ -304,6 +319,22 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe
return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
}
func (dc *DisruptionController) implementsScale(gv schema.GroupVersion, kind string) (bool, error) {
resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gv.String())
if err != nil {
return false, err
}
for _, resource := range resourceList.APIResources {
if resource.Kind != kind {
continue
}
if strings.HasSuffix(resource.Name, "/scale") {
return true, nil
}
}
return false, nil
}
func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
if err != nil {

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
discoveryfake "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
scalefake "k8s.io/client-go/scale/fake"
@ -107,6 +108,7 @@ type disruptionController struct {
coreClient *fake.Clientset
scaleClient *scalefake.FakeScaleClient
discoveryClient *discoveryfake.FakeDiscovery
}
var customGVK = schema.GroupVersionKind{
@ -124,6 +126,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
fakeScaleClient := &scalefake.FakeScaleClient{}
fakeDiscovery := &discoveryfake.FakeDiscovery{
Fake: &core.Fake{},
}
dc := NewDisruptionController(
informerFactory.Core().V1().Pods(),
@ -135,6 +140,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
coreClient,
testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient,
fakeDiscovery,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
@ -157,6 +163,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().StatefulSets().Informer().GetStore(),
coreClient,
fakeScaleClient,
fakeDiscovery,
}, ps
}
@ -571,6 +578,77 @@ func TestScaleResource(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, disruptionsAllowed, pods, replicas-maxUnavailable, replicas, map[string]metav1.Time{})
}
func TestScaleFinderNoResource(t *testing.T) {
resourceName := "customresources"
testCases := map[string]struct {
apiResources []metav1.APIResource
expectError bool
}{
"resource implements scale": {
apiResources: []metav1.APIResource{
{
Kind: customGVK.Kind,
Name: resourceName,
},
{
Kind: customGVK.Kind,
Name: resourceName + "/scale",
},
},
expectError: false,
},
"resource does not implement scale": {
apiResources: []metav1.APIResource{
{
Kind: customGVK.Kind,
Name: resourceName,
},
},
expectError: true,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
customResourceUID := uuid.NewUUID()
dc, _ := newFakeDisruptionController()
dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) {
gr := schema.GroupResource{
Group: customGVK.Group,
Resource: resourceName,
}
return true, nil, errors.NewNotFound(gr, "name")
})
dc.discoveryClient.Resources = []*metav1.APIResourceList{
{
GroupVersion: customGVK.GroupVersion().String(),
APIResources: tc.apiResources,
},
}
trueVal := true
ownerRef := &metav1.OwnerReference{
Kind: customGVK.Kind,
APIVersion: customGVK.GroupVersion().String(),
Controller: &trueVal,
UID: customResourceUID,
}
_, err := dc.getScaleController(ownerRef, "default")
if tc.expectError && err == nil {
t.Error("expected error, but didn't get one")
}
if !tc.expectError && err != nil {
t.Errorf("did not expect error, but got %v", err)
}
})
}
}
// Verify that multiple controllers doesn't allow the PDB to be set true.
func TestMultipleControllers(t *testing.T) {
const podCount = 2

View File

@ -86,6 +86,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti
client,
mapper,
scaleClient,
client.Discovery(),
)
return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
}

View File

@ -356,6 +356,7 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D
client,
mapper,
scaleClient,
client.Discovery(),
)
return s, closeFn, rm, informers, clientSet
}

View File

@ -69,7 +69,8 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis
informers.Apps().V1().StatefulSets(),
testCtx.ClientSet,
mapper,
scaleClient)
scaleClient,
testCtx.ClientSet.Discovery())
informers.Start(testCtx.Scheduler.StopEverything)
informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)