From 1508ecfe3730709aa7e9e6ade46aea0c413ad237 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Wed, 4 Jan 2017 16:30:32 -0800 Subject: [PATCH] Make PDBs represent percentage in StatefulSet --- pkg/controller/disruption/disruption.go | 50 +++++++++++++++- pkg/controller/disruption/disruption_test.go | 61 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 85c5d76dd4c..3593ff42686 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" @@ -51,7 +52,7 @@ const statusUpdateRetries = 2 // all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that // pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should // be more than enough. -// If the cotroller is running on a different node it is important that the two nodes have synced +// If the controller is running on a different node it is important that the two nodes have synced // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough // protection against unwanted pod disruptions. const DeletionTimeout = 2 * 60 * time.Second @@ -80,6 +81,10 @@ type DisruptionController struct { dController *cache.Controller dLister cache.StoreToDeploymentLister + ssStore cache.Store + ssController *cache.Controller + ssLister cache.StoreToStatefulSetLister + // PodDisruptionBudget keys that need to be synced. queue workqueue.RateLimitingInterface recheckQueue workqueue.DelayingInterface @@ -187,9 +192,23 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c cache.ResourceEventHandlerFuncs{}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - dc.dLister.Indexer = dc.dIndexer + dc.ssStore, dc.ssController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options) + }, + }, + &apps.StatefulSet{}, + 30*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + dc.ssLister.Store = dc.ssStore + return dc } @@ -201,7 +220,8 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c // and we may well need further tweaks just to be able to access scale // subresources. func (dc *DisruptionController) finders() []podControllerFinder { - return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets} + return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets, + dc.getPodStatefulSets} } // getPodReplicaSets finds replicasets which have no matching deployments. @@ -231,6 +251,29 @@ func (dc *DisruptionController) getPodReplicaSets(pod *v1.Pod) ([]controllerAndS return cas, nil } +// getPodStatefulSet returns the statefulset managing the given pod. +func (dc *DisruptionController) getPodStatefulSets(pod *v1.Pod) ([]controllerAndScale, error) { + cas := []controllerAndScale{} + ss, err := dc.ssLister.GetPodStatefulSets(pod) + + // GetPodStatefulSets returns an error only if no StatefulSets are found. We + // don't return that as an error to the caller. + if err != nil { + return cas, nil + } + + controllerScale := map[types.UID]int32{} + for _, s := range ss { + controllerScale[s.UID] = *(s.Spec.Replicas) + } + + for uid, scale := range controllerScale { + cas = append(cas, controllerAndScale{UID: uid, scale: scale}) + } + + return cas, nil +} + // getPodDeployments finds deployments for any replicasets which are being managed by deployments. func (dc *DisruptionController) getPodDeployments(pod *v1.Pod) ([]controllerAndScale, error) { cas := []controllerAndScale{} @@ -284,6 +327,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { go dc.rcController.Run(stopCh) go dc.rsController.Run(stopCh) go dc.dController.Run(stopCh) + go dc.ssController.Run(stopCh) go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.recheckWorker, time.Second, stopCh) diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index 39d20f1e22b..0b5e826bacf 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" @@ -92,6 +93,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) { rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, + ssLister: cache.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)}, getUpdater: func() updater { return ps.Set }, broadcaster: record.NewBroadcaster(), } @@ -236,6 +238,30 @@ func newReplicaSet(t *testing.T, size int32) (*extensions.ReplicaSet, string) { return rs, rsName } +func newStatefulSet(t *testing.T, size int32) (*apps.StatefulSet, string) { + ss := &apps.StatefulSet{ + TypeMeta: metav1.TypeMeta{APIVersion: registered.GroupOrDie(v1.GroupName).GroupVersion.String()}, + ObjectMeta: v1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: v1.NamespaceDefault, + ResourceVersion: "18", + Labels: fooBar(), + }, + Spec: apps.StatefulSetSpec{ + Replicas: &size, + Selector: newSelFooBar(), + }, + } + + ssName, err := controller.KeyFunc(ss) + if err != nil { + t.Fatalf("Unexpected error naming StatefulSet %q: %v", ss.Name, err) + } + + return ss, ssName +} + func update(t *testing.T, store cache.Store, obj interface{}) { if err := store.Update(obj); err != nil { t.Fatalf("Could not add %+v to %+v: %v", obj, store, err) @@ -409,6 +435,41 @@ func TestReplicationController(t *testing.T) { ps.VerifyDisruptionAllowed(t, pdbName, 0) } +func TestStatefulSetController(t *testing.T) { + labels := map[string]string{ + "foo": "bar", + "baz": "quux", + } + + dc, ps := newFakeDisruptionController() + + // 34% should round up to 2 + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%")) + add(t, dc.pdbLister.Store, pdb) + ss, _ := newStatefulSet(t, 3) + add(t, dc.ssLister.Store, ss) + dc.sync(pdbName) + + // It starts out at 0 expected because, with no pods, the PDB doesn't know + // about the SS. This is a known bug. TODO(mml): file issue + ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) + + pods := []*v1.Pod{} + + for i := int32(0); i < 3; i++ { + pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) + pods = append(pods, pod) + pod.Labels = labels + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + if i < 2 { + ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) + } else { + ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3, map[string]metav1.Time{}) + } + } +} + func TestTwoControllers(t *testing.T) { // Most of this test is in verifying intermediate cases as we define the // three controllers and create the pods.