Support scale subresource for PDBs (#76294)

* Support scale subresource for PDBs

* Check group in finder functions

* Small fixes and more tests
This commit is contained in:
Morten Torkildsen 2019-05-23 22:24:18 -07:00 committed by Kubernetes Prow Robot
parent cdff17a96b
commit f1883c9e8c
16 changed files with 808 additions and 50 deletions

View File

@ -22,6 +22,8 @@ package app
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/controller/disruption"
"net/http"
@ -40,6 +42,15 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error
resource, group+"/"+version)
return nil, false, nil
}
client := ctx.ClientBuilder.ClientOrDie("disruption-controller")
config := ctx.ClientBuilder.ConfigOrDie("disruption-controller")
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
scaleClient, err := scale.NewForConfig(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
go disruption.NewDisruptionController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
@ -47,7 +58,9 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"),
client,
ctx.RESTMapper,
scaleClient,
).Run(ctx.Stop)
return nil, true, nil
}

View File

@ -19,7 +19,9 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -34,6 +36,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
@ -49,13 +52,20 @@ go_test(
"//pkg/apis/core/install:go_default_library",
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library",

View File

@ -26,7 +26,9 @@ import (
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -41,6 +43,7 @@ import (
appsv1listers "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@ -67,6 +70,9 @@ type updater func(*policy.PodDisruptionBudget) error
type DisruptionController struct {
kubeClient clientset.Interface
mapper apimeta.RESTMapper
scaleNamespacer scaleclient.ScalesGetter
pdbLister policylisters.PodDisruptionBudgetLister
pdbListerSynced cache.InformerSynced
@ -105,7 +111,7 @@ type controllerAndScale struct {
// podControllerFinder is a function type that maps a pod to a list of
// controllers and their scale.
type podControllerFinder func(*v1.Pod) (*controllerAndScale, error)
type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
func NewDisruptionController(
podInformer coreinformers.PodInformer,
@ -115,6 +121,8 @@ func NewDisruptionController(
dInformer appsv1informers.DeploymentInformer,
ssInformer appsv1informers.StatefulSetInformer,
kubeClient clientset.Interface,
restMapper apimeta.RESTMapper,
scaleNamespacer scaleclient.ScalesGetter,
) *DisruptionController {
dc := &DisruptionController{
kubeClient: kubeClient,
@ -157,19 +165,19 @@ func NewDisruptionController(
dc.ssLister = ssInformer.Lister()
dc.ssListerSynced = ssInformer.Informer().HasSynced
dc.mapper = restMapper
dc.scaleNamespacer = scaleNamespacer
return dc
}
// TODO(mml): When controllerRef is implemented (#2210), we *could* simply
// return controllers without their scales, and access scale type-generically
// via the scale subresource. That may not be as much of a win as it sounds,
// however. We are accessing everything through the pkg/client/cache API that
// we have to set up and tune to the types we know we'll be accessing anyway,
// and we may well need further tweaks just to be able to access scale
// subresources.
// The workload resources do implement the scale subresource, so it would
// be possible to only check the scale subresource here. But since there is no
// way to take advantage of listers with scale subresources, we use the workload
// resources directly and only fall back to the scale subresource when needed.
func (dc *DisruptionController) finders() []podControllerFinder {
return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
dc.getPodStatefulSet}
dc.getPodStatefulSet, dc.getScaleController}
}
var (
@ -180,15 +188,12 @@ var (
)
// getPodReplicaSet finds a replicaset which has no matching deployments.
func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndScale, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
return nil, nil
func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
if !ok || err != nil {
return nil, err
}
if controllerRef.Kind != controllerKindRS.Kind {
return nil, nil
}
rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
@ -204,16 +209,13 @@ func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndSca
return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
}
// getPodStatefulSet returns the statefulset managing the given pod.
func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndScale, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
return nil, nil
// getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
if !ok || err != nil {
return nil, err
}
if controllerRef.Kind != controllerKindSS.Kind {
return nil, nil
}
ss, err := dc.ssLister.StatefulSets(pod.Namespace).Get(controllerRef.Name)
ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
@ -226,15 +228,12 @@ func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndSc
}
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndScale, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
return nil, nil
func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
if !ok || err != nil {
return nil, err
}
if controllerRef.Kind != controllerKindRS.Kind {
return nil, nil
}
rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
@ -246,8 +245,10 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca
if controllerRef == nil {
return nil, nil
}
if controllerRef.Kind != controllerKindDep.Kind {
return nil, nil
ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
if !ok || err != nil {
return nil, err
}
deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
if err != nil {
@ -260,15 +261,12 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca
return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
}
func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*controllerAndScale, error) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
return nil, nil
func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
if !ok || err != nil {
return nil, err
}
if controllerRef.Kind != controllerKindRC.Kind {
return nil, nil
}
rc, err := dc.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name)
rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
if err != nil {
// The only possible error is NotFound, which is ok here.
return nil, nil
@ -279,6 +277,55 @@ func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*contr
return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
}
func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
if err != nil {
return nil, err
}
gk := schema.GroupKind{
Group: gv.Group,
Kind: controllerRef.Kind,
}
mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
if err != nil {
return nil, err
}
gr := mapping.Resource.GroupResource()
scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
if scale.UID != controllerRef.UID {
return nil, nil
}
return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
}
func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
if err != nil {
return false, err
}
if controllerRef.Kind != expectedKind {
return false, nil
}
for _, group := range expectedGroups {
if group == gv.Group {
return true, nil
}
}
return false, nil
}
func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
@ -583,10 +630,23 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget
// 1. Find the controller for each pod. If any pod has 0 controllers,
// that's an error. With ControllerRef, a pod can only have 1 controller.
for _, pod := range pods {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
err = fmt.Errorf("found no controller ref for pod %q", pod.Name)
dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error())
return
}
// If we already know the scale of the controller there is no need to do anything.
if _, found := controllerScale[controllerRef.UID]; found {
continue
}
// Check all the supported controllers to find the desired scale.
foundController := false
for _, finder := range dc.finders() {
var controllerNScale *controllerAndScale
controllerNScale, err = finder(pod)
controllerNScale, err = finder(controllerRef, pod.Namespace)
if err != nil {
return
}

View File

@ -23,13 +23,20 @@ import (
"time"
apps "k8s.io/api/apps/v1"
autoscalingapi "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
scalefake "k8s.io/client-go/scale/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
_ "k8s.io/kubernetes/pkg/apis/core/install"
@ -90,6 +97,14 @@ type disruptionController struct {
rsStore cache.Store
dStore cache.Store
ssStore cache.Store
scaleClient *scalefake.FakeScaleClient
}
var customGVK = schema.GroupVersionKind{
Group: "custom.k8s.io",
Version: "v1",
Kind: "customresource",
}
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
@ -97,6 +112,10 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc())
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(customGVK, &v1.Service{})
fakeScaleClient := &scalefake.FakeScaleClient{}
dc := NewDisruptionController(
informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
@ -105,6 +124,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().Deployments(),
informerFactory.Apps().V1().StatefulSets(),
nil,
testrestmapper.TestOnlyStaticRESTMapper(scheme),
fakeScaleClient,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
@ -122,6 +143,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) {
informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(),
informerFactory.Apps().V1().Deployments().Informer().GetStore(),
informerFactory.Apps().V1().StatefulSets().Informer().GetStore(),
fakeScaleClient,
}, ps
}
@ -490,6 +512,52 @@ func TestReplicaSet(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{})
}
func TestScaleResource(t *testing.T) {
customResourceUID := uuid.NewUUID()
replicas := int32(10)
pods := int32(4)
maxUnavailable := int32(5)
dc, ps := newFakeDisruptionController()
dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &autoscalingapi.Scale{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
UID: customResourceUID,
},
Spec: autoscalingapi.ScaleSpec{
Replicas: replicas,
},
}
return true, obj, nil
})
pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt(int(maxUnavailable)))
add(t, dc.pdbStore, pdb)
trueVal := true
for i := 0; i < int(pods); i++ {
pod, _ := newPod(t, fmt.Sprintf("pod-%d", i))
pod.SetOwnerReferences([]metav1.OwnerReference{
{
Kind: customGVK.Kind,
APIVersion: customGVK.GroupVersion().String(),
Controller: &trueVal,
UID: customResourceUID,
},
})
add(t, dc.podStore, pod)
}
dc.sync(pdbName)
disruptionsAllowed := int32(0)
if replicas-pods < maxUnavailable {
disruptionsAllowed = maxUnavailable - (replicas - pods)
}
ps.VerifyPdbStatus(t, pdbName, disruptionsAllowed, pods, replicas-maxUnavailable, replicas, map[string]metav1.Time{})
}
// Verify that multiple controllers doesn't allow the PDB to be set true.
func TestMultipleControllers(t *testing.T) {
const podCount = 2
@ -759,3 +827,202 @@ func TestUpdateDisruptedPods(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}})
}
func TestBasicFinderFunctions(t *testing.T) {
dc, _ := newFakeDisruptionController()
rs, _ := newReplicaSet(t, 10)
add(t, dc.rsStore, rs)
rc, _ := newReplicationController(t, 12)
add(t, dc.rcStore, rc)
ss, _ := newStatefulSet(t, 14)
add(t, dc.ssStore, ss)
testCases := map[string]struct {
finderFunc podControllerFinder
apiVersion string
kind string
name string
uid types.UID
findsScale bool
expectedScale int32
}{
"replicaset controller with apps group": {
finderFunc: dc.getPodReplicaSet,
apiVersion: "apps/v1",
kind: controllerKindRS.Kind,
name: rs.Name,
uid: rs.UID,
findsScale: true,
expectedScale: 10,
},
"replicaset controller with invalid group": {
finderFunc: dc.getPodReplicaSet,
apiVersion: "invalid/v1",
kind: controllerKindRS.Kind,
name: rs.Name,
uid: rs.UID,
findsScale: false,
},
"replicationcontroller with empty group": {
finderFunc: dc.getPodReplicationController,
apiVersion: "/v1",
kind: controllerKindRC.Kind,
name: rc.Name,
uid: rc.UID,
findsScale: true,
expectedScale: 12,
},
"replicationcontroller with invalid group": {
finderFunc: dc.getPodReplicationController,
apiVersion: "apps/v1",
kind: controllerKindRC.Kind,
name: rc.Name,
uid: rc.UID,
findsScale: false,
},
"statefulset controller with extensions group": {
finderFunc: dc.getPodStatefulSet,
apiVersion: "apps/v1",
kind: controllerKindSS.Kind,
name: ss.Name,
uid: ss.UID,
findsScale: true,
expectedScale: 14,
},
"statefulset controller with invalid kind": {
finderFunc: dc.getPodStatefulSet,
apiVersion: "apps/v1",
kind: controllerKindRS.Kind,
name: ss.Name,
uid: ss.UID,
findsScale: false,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
controllerRef := &metav1.OwnerReference{
APIVersion: tc.apiVersion,
Kind: tc.kind,
Name: tc.name,
UID: tc.uid,
}
controllerAndScale, _ := tc.finderFunc(controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil {
if tc.findsScale {
t.Error("Expected scale, but got nil")
}
return
}
if got, want := controllerAndScale.scale, tc.expectedScale; got != want {
t.Errorf("Expected scale %d, but got %d", want, got)
}
if got, want := controllerAndScale.UID, tc.uid; got != want {
t.Errorf("Expected uid %s, but got %s", want, got)
}
})
}
}
func TestDeploymentFinderFunction(t *testing.T) {
labels := map[string]string{
"foo": "bar",
}
testCases := map[string]struct {
rsApiVersion string
rsKind string
depApiVersion string
depKind string
findsScale bool
expectedScale int32
}{
"happy path": {
rsApiVersion: "apps/v1",
rsKind: controllerKindRS.Kind,
depApiVersion: "extensions/v1",
depKind: controllerKindDep.Kind,
findsScale: true,
expectedScale: 10,
},
"invalid rs apiVersion": {
rsApiVersion: "invalid/v1",
rsKind: controllerKindRS.Kind,
depApiVersion: "apps/v1",
depKind: controllerKindDep.Kind,
findsScale: false,
},
"invalid rs kind": {
rsApiVersion: "apps/v1",
rsKind: "InvalidKind",
depApiVersion: "apps/v1",
depKind: controllerKindDep.Kind,
findsScale: false,
},
"invalid deployment apiVersion": {
rsApiVersion: "extensions/v1",
rsKind: controllerKindRS.Kind,
depApiVersion: "deployment/v1",
depKind: controllerKindDep.Kind,
findsScale: false,
},
"invalid deployment kind": {
rsApiVersion: "apps/v1",
rsKind: controllerKindRS.Kind,
depApiVersion: "extensions/v1",
depKind: "InvalidKind",
findsScale: false,
},
}
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
dc, _ := newFakeDisruptionController()
dep, _ := newDeployment(t, 10)
dep.Spec.Selector = newSel(labels)
add(t, dc.dStore, dep)
rs, _ := newReplicaSet(t, 5)
rs.Labels = labels
trueVal := true
rs.OwnerReferences = append(rs.OwnerReferences, metav1.OwnerReference{
APIVersion: tc.depApiVersion,
Kind: tc.depKind,
Name: dep.Name,
UID: dep.UID,
Controller: &trueVal,
})
add(t, dc.rsStore, rs)
controllerRef := &metav1.OwnerReference{
APIVersion: tc.rsApiVersion,
Kind: tc.rsKind,
Name: rs.Name,
UID: rs.UID,
}
controllerAndScale, _ := dc.getPodDeployment(controllerRef, metav1.NamespaceDefault)
if controllerAndScale == nil {
if tc.findsScale {
t.Error("Expected scale, but got nil")
}
return
}
if got, want := controllerAndScale.scale, tc.expectedScale; got != want {
t.Errorf("Expected scale %d, but got %d", want, got)
}
if got, want := controllerAndScale.UID, dep.UID; got != want {
t.Errorf("Expected uid %s, but got %s", want, got)
}
})
}
}

View File

@ -138,6 +138,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
rbacv1helpers.NewRule("get", "list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(policyGroup).Resources("poddisruptionbudgets/status").RuleOrDie(),
rbacv1helpers.NewRule("get").Groups("*").Resources("*/scale").RuleOrDie(),
eventsRule(),
},
})

View File

@ -33,6 +33,7 @@ var rolesWithAllowStar = sets.NewString(
saRolePrefix+"resourcequota-controller",
saRolePrefix+"horizontal-pod-autoscaler",
saRolePrefix+"clusterrole-aggregation-controller",
saRolePrefix+"disruption-controller",
)
// TestNoStarsForControllers confirms that no controller role has star verbs, groups,

View File

@ -382,6 +382,12 @@ items:
- poddisruptionbudgets/status
verbs:
- update
- apiGroups:
- '*'
resources:
- '*/scale'
verbs:
- get
- apiGroups:
- ""
resources:

View File

@ -45,6 +45,7 @@ filegroup(
"//test/integration/daemonset:all-srcs",
"//test/integration/defaulttolerationseconds:all-srcs",
"//test/integration/deployment:all-srcs",
"//test/integration/disruption:all-srcs",
"//test/integration/dryrun:all-srcs",
"//test/integration/etcd:all-srcs",
"//test/integration/evictions:all-srcs",

View File

@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
srcs = [
"disruption_test.go",
"main_test.go",
],
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app/testing:go_default_library",
"//pkg/controller/disruption:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//test/integration/etcd:go_default_library",
"//test/integration/framework:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,284 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package disruption
import (
"fmt"
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
)
func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
clientSet, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "pdb-informers")), resyncPeriod)
client := clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "disruption-controller"))
discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
scaleClient, err := scale.NewForConfig(server.ClientConfig, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
t.Fatalf("Error creating scaleClient: %v", err)
}
apiExtensionClient, err := apiextensionsclientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating extension clientset: %v", err)
}
dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating dynamicClient: %v", err)
}
pdbc := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Apps().V1().ReplicaSets(),
informers.Apps().V1().Deployments(),
informers.Apps().V1().StatefulSets(),
client,
mapper,
scaleClient,
)
return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
}
func TestPDBWithScaleSubresource(t *testing.T) {
s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t)
defer s.TearDownFn()
nsName := "pdb-scale-subresource"
createNs(t, nsName, clientSet)
stopCh := make(chan struct{})
informers.Start(stopCh)
go pdbc.Run(stopCh)
defer close(stopCh)
crdDefinition := newCustomResourceDefinition()
etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition)
gvr := schema.GroupVersionResource{Group: crdDefinition.Spec.Group, Version: crdDefinition.Spec.Version, Resource: crdDefinition.Spec.Names.Plural}
resourceClient := dynamicClient.Resource(gvr).Namespace(nsName)
replicas := 4
maxUnavailable := int32(2)
podLabelValue := "test-crd"
resource := &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": crdDefinition.Spec.Names.Kind,
"apiVersion": crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Version,
"metadata": map[string]interface{}{
"name": "resource",
"namespace": nsName,
},
"spec": map[string]interface{}{
"replicas": replicas,
},
},
}
createdResource, err := resourceClient.Create(resource, metav1.CreateOptions{})
if err != nil {
t.Error(err)
}
trueValue := true
ownerRef := metav1.OwnerReference{
Name: resource.GetName(),
Kind: crdDefinition.Spec.Names.Kind,
APIVersion: crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Version,
UID: createdResource.GetUID(),
Controller: &trueValue,
}
for i := 0; i < replicas; i++ {
createPod(t, fmt.Sprintf("pod-%d", i), nsName, podLabelValue, clientSet, ownerRef)
}
waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)
pdb := &v1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pdb",
},
Spec: v1beta1.PodDisruptionBudgetSpec{
MaxUnavailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: maxUnavailable,
},
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": podLabelValue},
},
},
}
if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(pdb); err != nil {
t.Errorf("Error creating PodDisruptionBudget: %v", err)
}
waitPDBStable(t, clientSet, 4, nsName, pdb.Name)
newPdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Get(pdb.Name, metav1.GetOptions{})
if expected, found := int32(replicas), newPdb.Status.ExpectedPods; expected != found {
t.Errorf("Expected %d, but found %d", expected, found)
}
if expected, found := int32(replicas)-maxUnavailable, newPdb.Status.DesiredHealthy; expected != found {
t.Errorf("Expected %d, but found %d", expected, found)
}
if expected, found := maxUnavailable, newPdb.Status.PodDisruptionsAllowed; expected != found {
t.Errorf("Expected %d, but found %d", expected, found)
}
}
func createPod(t *testing.T, name, namespace, labelValue string, clientSet clientset.Interface, ownerRef metav1.OwnerReference) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{"app": labelValue},
OwnerReferences: []metav1.OwnerReference{
ownerRef,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
}
_, err := clientSet.CoreV1().Pods(namespace).Create(pod)
if err != nil {
t.Error(err)
}
addPodConditionReady(pod)
if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(pod); err != nil {
t.Error(err)
}
}
func createNs(t *testing.T, name string, clientSet clientset.Interface) {
_, err := clientSet.CoreV1().Namespaces().Create(&v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
})
if err != nil {
t.Errorf("Error creating namespace: %v", err)
}
}
func addPodConditionReady(pod *v1.Pod) {
pod.Status = v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
}
}
func newCustomResourceDefinition() *apiextensionsv1beta1.CustomResourceDefinition {
return &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{Name: "crds.mygroup.example.com"},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: "mygroup.example.com",
Version: "v1beta1",
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: "crds",
Singular: "crd",
Kind: "Crd",
ListKind: "CrdList",
},
Scope: apiextensionsv1beta1.NamespaceScoped,
Subresources: &apiextensionsv1beta1.CustomResourceSubresources{
Scale: &apiextensionsv1beta1.CustomResourceSubresourceScale{
SpecReplicasPath: ".spec.replicas",
StatusReplicasPath: ".status.replicas",
},
},
},
}
}
func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(pdbName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pdb.Status.CurrentHealthy != podNum {
return false, nil
}
return true, nil
}); err != nil {
t.Fatal(err)
}
}
func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) {
if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
if len(objects) != podNum {
return false, nil
}
for _, obj := range objects {
pod := obj.(*v1.Pod)
if pod.Status.Phase != phase {
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,26 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package disruption
import (
"k8s.io/kubernetes/test/integration/framework"
"testing"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -22,9 +22,13 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//test/integration/framework:go_default_library",
],

View File

@ -18,6 +18,7 @@ package evictions
import (
"fmt"
"net/http/httptest"
"reflect"
"sync"
@ -32,9 +33,13 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/test/integration/framework"
@ -329,6 +334,17 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
client := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller"))
discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
t.Fatalf("Error in create scaleClient: %v", err)
}
rm := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
@ -336,7 +352,9 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D
informers.Apps().V1().ReplicaSets(),
informers.Apps().V1().Deployments(),
informers.Apps().V1().StatefulSets(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")),
client,
mapper,
scaleClient,
)
return s, closeFn, rm, informers, clientSet
}

View File

@ -109,12 +109,16 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//test/integration/framework:go_default_library",
"//test/utils/image:go_default_library",

View File

@ -736,7 +736,7 @@ func TestPDBInPreemption(t *testing.T) {
defer cleanupTest(t, context)
cs := context.clientSet
initDisruptionController(context)
initDisruptionController(t, context)
defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),

View File

@ -33,12 +33,16 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -238,9 +242,19 @@ func initTestSchedulerWithOptions(
// initDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects.
func initDisruptionController(context *testContext) *disruption.DisruptionController {
func initDisruptionController(t *testing.T, context *testContext) *disruption.DisruptionController {
informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour)
discoveryClient := cacheddiscovery.NewMemCacheClient(context.clientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
config := restclient.Config{Host: context.httpServer.URL}
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(context.clientSet.Discovery())
scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
t.Fatalf("Error in create scaleClient: %v", err)
}
dc := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
@ -248,7 +262,9 @@ func initDisruptionController(context *testContext) *disruption.DisruptionContro
informers.Apps().V1().ReplicaSets(),
informers.Apps().V1().Deployments(),
informers.Apps().V1().StatefulSets(),
context.clientSet)
context.clientSet,
mapper,
scaleClient)
informers.Start(context.schedulerConfig.StopEverything)
informers.WaitForCacheSync(context.schedulerConfig.StopEverything)