diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 7073d809da2..95c1dd1d730 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -49,6 +49,7 @@ import ( certcontroller "k8s.io/kubernetes/pkg/controller/certificates" "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" + "k8s.io/kubernetes/pkg/controller/disruption" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/garbagecollector" @@ -367,6 +368,18 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } } + groupVersion = "policy/v1alpha1" + resources, found = resourceMap[groupVersion] + glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap) + if containsVersion(versions, groupVersion) && found { + glog.Infof("Starting %s apis", groupVersion) + if containsResource(resources, "poddisruptionbudgets") { + glog.Infof("Starting disruption controller") + go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), kubeClient).Run(wait.NeverStop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + } + } + groupVersion = "apps/v1alpha1" resources, found = resourceMap[groupVersion] glog.Infof("Attempting to start petset, full resource map %+v", resourceMap) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 4b96cf43cbd..8abde96c4ac 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/certificates" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/labels" ) @@ -710,3 +711,44 @@ func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces [] return namespaces, nil } + +type StoreToPodDisruptionBudgetLister struct { + Store +} + +// GetPodPodDisruptionBudgets returns a list of PodDisruptionBudgets matching a pod. Returns an error only if no matching PodDisruptionBudgets are found. +func (s *StoreToPodDisruptionBudgetLister) GetPodPodDisruptionBudgets(pod *api.Pod) (pdbList []policy.PodDisruptionBudget, err error) { + var selector labels.Selector + + if len(pod.Labels) == 0 { + err = fmt.Errorf("no PodDisruptionBudgets found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + pdb, ok := m.(*policy.PodDisruptionBudget) + if !ok { + glog.Errorf("Unexpected: %v is not a PodDisruptionBudget", m) + continue + } + if pdb.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(pdb.Spec.Selector) + if err != nil { + glog.Warningf("invalid selector: %v", err) + // TODO(mml): add an event to the PDB + continue + } + + // If a PDB with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + pdbList = append(pdbList, *pdb) + } + if len(pdbList) == 0 { + err = fmt.Errorf("could not find PodDisruptionBudget for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/client/unversioned/client.go b/pkg/client/unversioned/client.go index 09d0515b1a3..d70c644a52e 100644 --- a/pkg/client/unversioned/client.go +++ b/pkg/client/unversioned/client.go @@ -183,6 +183,10 @@ func (c *Client) Rbac() RbacInterface { return c.RbacClient } +func (c *Client) Policy() PolicyInterface { + return c.PolicyClient +} + func (c *Client) Discovery() discovery.DiscoveryInterface { return c.DiscoveryClient } diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go new file mode 100644 index 00000000000..8b80a9bcc69 --- /dev/null +++ b/pkg/controller/disruption/disruption.go @@ -0,0 +1,608 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const statusUpdateRetries = 2 + +type updater func(*policy.PodDisruptionBudget) error + +type DisruptionController struct { + kubeClient *client.Client + + pdbStore cache.Store + pdbController *framework.Controller + pdbLister cache.StoreToPodDisruptionBudgetLister + + podController framework.ControllerInterface + podLister cache.StoreToPodLister + + rcIndexer cache.Indexer + rcController *framework.Controller + rcLister cache.StoreToReplicationControllerLister + + rsStore cache.Store + rsController *framework.Controller + rsLister cache.StoreToReplicaSetLister + + dStore cache.Store + dController *framework.Controller + dLister cache.StoreToDeploymentLister + + queue *workqueue.Type + + broadcaster record.EventBroadcaster + recorder record.EventRecorder + + getUpdater func() updater +} + +// controllerAndScale is used to return (controller, scale) pairs from the +// controller finder functions. +type controllerAndScale struct { + types.UID + scale int32 +} + +// podControllerFinder is a function type that maps a pod to a list of +// controllers and their scale. +type podControllerFinder func(*api.Pod) ([]controllerAndScale, error) + +func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClient *client.Client) *DisruptionController { + dc := &DisruptionController{ + kubeClient: kubeClient, + podController: podInformer.GetController(), + queue: workqueue.New(), + broadcaster: record.NewBroadcaster(), + } + dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) + + dc.getUpdater = func() updater { return dc.writePdbStatus } + + dc.podLister.Indexer = podInformer.GetIndexer() + + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: dc.addPod, + UpdateFunc: dc.updatePod, + DeleteFunc: dc.deletePod, + }) + + dc.pdbStore, dc.pdbController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).Watch(options) + }, + }, + &policy.PodDisruptionBudget{}, + 30*time.Second, + framework.ResourceEventHandlerFuncs{ + AddFunc: dc.addDb, + UpdateFunc: dc.updateDb, + DeleteFunc: dc.removeDb, + }, + ) + dc.pdbLister.Store = dc.pdbStore + + dc.rcIndexer, dc.rcController = framework.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.kubeClient.ReplicationControllers(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(options) + }, + }, + &api.ReplicationController{}, + 30*time.Second, + framework.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + dc.rcLister.Indexer = dc.rcIndexer + + dc.rsStore, dc.rsController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) + }, + }, + &extensions.ReplicaSet{}, + 30*time.Second, + framework.ResourceEventHandlerFuncs{}, + ) + + dc.rsLister.Store = dc.rsStore + + dc.dStore, dc.dController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options) + }, + }, + &extensions.Deployment{}, + 30*time.Second, + framework.ResourceEventHandlerFuncs{}, + ) + + dc.dLister.Store = dc.dStore + + return dc +} + +// TODO(mml): When controllerRef is implemented (#2210), we *could* simply +// return controllers without their scales, and access scale type-generically +// via the scale subresource. That may not be as much of a win as it sounds, +// however. We are accessing everything through the pkg/client/cache API that +// we have to set up and tune to the types we know we'll be accessing anyway, +// and we may well need further tweaks just to be able to access scale +// subresources. +func (dc *DisruptionController) finders() []podControllerFinder { + return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets} +} + +// getPodReplicaSets finds replicasets which have no matching deployments. +func (dc *DisruptionController) getPodReplicaSets(pod *api.Pod) ([]controllerAndScale, error) { + cas := []controllerAndScale{} + rss, err := dc.rsLister.GetPodReplicaSets(pod) + // GetPodReplicaSets returns an error only if no ReplicaSets are found. We + // don't return that as an error to the caller. + if err != nil { + return cas, nil + } + controllerScale := map[types.UID]int32{} + for _, rs := range rss { + // GetDeploymentsForReplicaSet returns an error only if no matching + // deployments are found. + _, err := dc.dLister.GetDeploymentsForReplicaSet(&rs) + if err == nil { // A deployment was found, so this finder will not count this RS. + continue + } + controllerScale[rs.UID] = rs.Spec.Replicas + } + + for uid, scale := range controllerScale { + cas = append(cas, controllerAndScale{UID: uid, scale: scale}) + } + + return cas, nil +} + +// getPodDeployments finds deployments for any replicasets which are being managed by deployments. +func (dc *DisruptionController) getPodDeployments(pod *api.Pod) ([]controllerAndScale, error) { + cas := []controllerAndScale{} + rss, err := dc.rsLister.GetPodReplicaSets(pod) + // GetPodReplicaSets returns an error only if no ReplicaSets are found. We + // don't return that as an error to the caller. + if err != nil { + return cas, nil + } + controllerScale := map[types.UID]int32{} + for _, rs := range rss { + ds, err := dc.dLister.GetDeploymentsForReplicaSet(&rs) + // GetDeploymentsForReplicaSet returns an error only if no matching + // deployments are found. In that case we skip this ReplicaSet. + if err != nil { + continue + } + for _, d := range ds { + controllerScale[d.UID] = d.Spec.Replicas + } + } + + for uid, scale := range controllerScale { + cas = append(cas, controllerAndScale{UID: uid, scale: scale}) + } + + return cas, nil +} + +func (dc *DisruptionController) getPodReplicationControllers(pod *api.Pod) ([]controllerAndScale, error) { + cas := []controllerAndScale{} + rcs, err := dc.rcLister.GetPodControllers(pod) + if err == nil { + for _, rc := range rcs { + cas = append(cas, controllerAndScale{UID: rc.UID, scale: rc.Spec.Replicas}) + } + } + return cas, nil +} + +func (dc *DisruptionController) Run(stopCh <-chan struct{}) { + glog.V(0).Infof("Starting disruption controller") + if dc.kubeClient != nil { + glog.V(0).Infof("Sending events to api server.") + dc.broadcaster.StartRecordingToSink(dc.kubeClient.Events("")) + } else { + glog.V(0).Infof("No api server defined - no events will be sent to API server.") + } + go dc.pdbController.Run(stopCh) + go dc.podController.Run(stopCh) + go dc.rcController.Run(stopCh) + go dc.rsController.Run(stopCh) + go dc.dController.Run(stopCh) + go wait.Until(dc.worker, time.Second, stopCh) + <-stopCh + glog.V(0).Infof("Shutting down disruption controller") +} + +func (dc *DisruptionController) addDb(obj interface{}) { + pdb := obj.(*policy.PodDisruptionBudget) + glog.V(4).Infof("add DB %q", pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) updateDb(old, cur interface{}) { + // TODO(mml) ignore updates where 'old' is equivalent to 'cur'. + pdb := cur.(*policy.PodDisruptionBudget) + glog.V(4).Infof("update DB %q", pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) removeDb(obj interface{}) { + pdb := obj.(*policy.PodDisruptionBudget) + glog.V(4).Infof("remove DB %q", pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + glog.V(4).Infof("addPod called on pod %q", pod.Name) + pdb := dc.getPdbForPod(pod) + if pdb == nil { + glog.V(4).Infof("No matching pdb for pod %q", pod.Name) + return + } + glog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) updatePod(old, cur interface{}) { + pod := cur.(*api.Pod) + glog.V(4).Infof("updatePod called on pod %q", pod.Name) + pdb := dc.getPdbForPod(pod) + if pdb == nil { + glog.V(4).Infof("No matching pdb for pod %q", pod.Name) + return + } + glog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new ReplicaSet will not be woken up till the periodic + // resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v", obj) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) + return + } + } + glog.V(4).Infof("deletePod called on pod %q", pod.Name) + pdb := dc.getPdbForPod(pod) + if pdb == nil { + glog.V(4).Infof("No matching pdb for pod %q", pod.Name) + return + } + glog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name) + dc.enqueuePdb(pdb) +} + +func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) { + key, err := controller.KeyFunc(pdb) + if err != nil { + glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err) + return + } + dc.queue.Add(key) +} + +func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget { + // GetPodPodDisruptionBudgets returns an error only if no + // PodDisruptionBudgets are found. We don't return that as an error to the + // caller. + pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod) + if err != nil { + glog.V(0).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name) + return nil + } + + if len(pdbs) > 1 { + msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name) + glog.Warning(msg) + dc.recorder.Event(pod, api.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) + } + return &pdbs[0] +} + +func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*api.Pod, error) { + sel, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector) + if sel.Empty() { + return []*api.Pod{}, nil + } + if err != nil { + return []*api.Pod{}, err + } + podList, err := dc.podLister.Pods(pdb.Namespace).List(sel) + if err != nil { + return []*api.Pod{}, err + } + pods := []*api.Pod{} + for i := range podList.Items { + pod := podList.Items[i] + pods = append(pods, &pod) + } + return pods, nil +} + +func (dc *DisruptionController) worker() { + work := func() bool { + key, quit := dc.queue.Get() + if quit { + return quit + } + defer dc.queue.Done(key) + glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string)) + if err := dc.sync(key.(string)); err != nil { + glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err) + // TODO(mml): In order to be safe in the face of a total inability to write state + // changes, we should write an expiration timestamp here and consumers + // of the PDB state (the /evict subresource handler) should check that + // any 'true' state is relatively fresh. + + // TODO(mml): file an issue to that effect + + // TODO(mml): If we used a workqueue.RateLimitingInterface, we could + // improve our behavior (be a better citizen) when we need to retry. + dc.queue.Add(key) + } + return false + } + for { + if quit := work(); quit { + return + } + } +} + +func (dc *DisruptionController) sync(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime)) + }() + + obj, exists, err := dc.pdbLister.Store.GetByKey(key) + if !exists { + return err + } + if err != nil { + glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err) + return err + } + + pdb := obj.(*policy.PodDisruptionBudget) + + if err := dc.trySync(pdb); err != nil { + return dc.failSafe(pdb) + } + + return nil +} + +func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { + pods, err := dc.getPodsForPdb(pdb) + if err != nil { + return err + } + + expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods) + if err != nil { + return err + } + + currentHealthy := countHealthyPods(pods) + err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount) + + return err +} + +func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*api.Pod) (expectedCount, desiredHealthy int32, err error) { + err = nil + if pdb.Spec.MinAvailable.Type == intstr.Int { + desiredHealthy = pdb.Spec.MinAvailable.IntVal + expectedCount = int32(len(pods)) + } else if pdb.Spec.MinAvailable.Type == intstr.String { + // When the user specifies a fraction of pods that must be available, we + // use as the fraction's denominator + // SUM_{all c in C} scale(c) + // where C is the union of C_p1, C_p2, ..., C_pN + // and each C_pi is the set of controllers controlling the pod pi + + // k8s only defines what will happens when 0 or 1 controllers control a + // given pod. We explicitly exclude the 0 controllers case here, and we + // report an error if we find a pod with more than 1 controller. Thus in + // practice each C_pi is a set of exactly 1 controller. + + // A mapping from controllers to their scale. + controllerScale := map[types.UID]int32{} + + // 1. Find the controller(s) for each pod. If any pod has 0 controllers, + // that's an error. If any pod has more than 1 controller, that's also an + // error. + for _, pod := range pods { + controllerCount := 0 + for _, finder := range dc.finders() { + var controllers []controllerAndScale + controllers, err = finder(pod) + if err != nil { + return + } + for _, controller := range controllers { + controllerScale[controller.UID] = controller.scale + controllerCount++ + } + } + if controllerCount == 0 { + err = fmt.Errorf("asked for percentage, but found no controllers for pod %q", pod.Name) + return + } else if controllerCount > 1 { + err = fmt.Errorf("pod %q has %v>1 controllers", pod.Name, controllerCount) + return + } + } + + // 2. Add up all the controllers. + expectedCount = 0 + for _, count := range controllerScale { + expectedCount += count + } + + // 3. Do the math. + var dh int + dh, err = intstr.GetValueFromIntOrPercent(&pdb.Spec.MinAvailable, int(expectedCount), true) + if err != nil { + return + } + desiredHealthy = int32(dh) + } + + return +} + +func countHealthyPods(pods []*api.Pod) (currentHealthy int32) { +Pod: + for _, pod := range pods { + for _, c := range pod.Status.Conditions { + if c.Type == api.PodReady && c.Status == api.ConditionTrue { + currentHealthy++ + continue Pod + } + } + } + + return +} + +// failSafe is an attempt to at least update the PodDisruptionAllowed field to +// false if everything something else has failed. This is one place we +// implement the "fail open" part of the design since if we manage to update +// this field correctly, we will prevent the /evict handler from approving an +// eviction when it may be unsafe to do so. +func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error { + obj, err := api.Scheme.DeepCopy(*pdb) + if err != nil { + return err + } + newPdb := obj.(policy.PodDisruptionBudget) + newPdb.Status.PodDisruptionAllowed = false + + return dc.getUpdater()(&newPdb) +} + +func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error { + // We require expectedCount to be > 0 so that PDBs which currently match no + // pods are in a safe state when their first pods appear but this controller + // has not updated their status yet. This isn't the only race, but it's a + // common one that's easy to detect. + disruptionAllowed := currentHealthy >= desiredHealthy && expectedCount > 0 + + if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionAllowed == disruptionAllowed { + return nil + } + + obj, err := api.Scheme.DeepCopy(*pdb) + if err != nil { + return err + } + newPdb := obj.(policy.PodDisruptionBudget) + + newPdb.Status = policy.PodDisruptionBudgetStatus{ + CurrentHealthy: currentHealthy, + DesiredHealthy: desiredHealthy, + ExpectedPods: expectedCount, + PodDisruptionAllowed: disruptionAllowed, + } + + return dc.getUpdater()(&newPdb) +} + +// refresh tries to re-GET the given PDB. If there are any errors, it just +// returns the old PDB. Intended to be used in a retry loop where it runs a +// bounded number of times. +func refresh(pdbClient client.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget { + newPdb, err := pdbClient.Get(pdb.Name) + if err == nil { + return newPdb + } else { + return pdb + } +} + +func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { + pdbClient := dc.kubeClient.Policy().PodDisruptionBudgets(pdb.Namespace) + st := pdb.Status + + var err error + for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) { + pdb.Status = st + if _, err = pdbClient.UpdateStatus(pdb); err == nil { + break + } + } + + return err +} diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go new file mode 100644 index 00000000000..3813af19d6d --- /dev/null +++ b/pkg/controller/disruption/disruption_test.go @@ -0,0 +1,488 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption + +import ( + "fmt" + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/uuid" +) + +type pdbStates map[string]policy.PodDisruptionBudget + +func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error { + key, err := controller.KeyFunc(pdb) + if err != nil { + return err + } + obj, err := api.Scheme.DeepCopy(*pdb) + if err != nil { + return err + } + (*ps)[key] = obj.(policy.PodDisruptionBudget) + + return nil +} + +func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget { + return (*ps)[key] +} + +func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionAllowed bool, currentHealthy, desiredHealthy, expectedPods int32) { + expectedStatus := policy.PodDisruptionBudgetStatus{ + PodDisruptionAllowed: disruptionAllowed, + CurrentHealthy: currentHealthy, + DesiredHealthy: desiredHealthy, + ExpectedPods: expectedPods, + } + actualStatus := ps.Get(key).Status + if !reflect.DeepEqual(actualStatus, expectedStatus) { + t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus) + } +} + +func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptionAllowed bool) { + pdb := ps.Get(key) + if pdb.Status.PodDisruptionAllowed != disruptionAllowed { + t.Fatalf("PodDisruptionAllowed mismatch for PDB %q. Expected %v but got %v.", key, disruptionAllowed, pdb.Status.PodDisruptionAllowed) + } +} + +func newFakeDisruptionController() (*DisruptionController, *pdbStates) { + ps := &pdbStates{} + + dc := &DisruptionController{ + pdbLister: cache.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)}, + podLister: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, + rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, + rsLister: cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)}, + dLister: cache.StoreToDeploymentLister{Store: cache.NewStore(controller.KeyFunc)}, + getUpdater: func() updater { return ps.Set }, + } + + return dc, ps +} + +func fooBar() map[string]string { + return map[string]string{"foo": "bar"} +} + +func newSel(labels map[string]string) *unversioned.LabelSelector { + return &unversioned.LabelSelector{MatchLabels: labels} +} + +func newSelFooBar() *unversioned.LabelSelector { + return newSel(map[string]string{"foo": "bar"}) +} + +func newPodDisruptionBudget(t *testing.T, minAvailable intstr.IntOrString) (*policy.PodDisruptionBudget, string) { + + pdb := &policy.PodDisruptionBudget{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: policy.PodDisruptionBudgetSpec{ + MinAvailable: minAvailable, + Selector: newSelFooBar(), + }, + } + + pdbName, err := controller.KeyFunc(pdb) + if err != nil { + t.Fatalf("Unexpected error naming pdb %q: %v", pdb.Name, err) + } + + return pdb, pdbName +} + +func newPod(t *testing.T, name string) (*api.Pod, string) { + pod := &api.Pod{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: uuid.NewUUID(), + Annotations: make(map[string]string), + Name: name, + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + Labels: fooBar(), + }, + Spec: api.PodSpec{}, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + {Type: api.PodReady, Status: api.ConditionTrue}, + }, + }, + } + + podName, err := controller.KeyFunc(pod) + if err != nil { + t.Fatalf("Unexpected error naming pod %q: %v", pod.Name, err) + } + + return pod, podName +} + +func newReplicationController(t *testing.T, size int32) (*api.ReplicationController, string) { + rc := &api.ReplicationController{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + Labels: fooBar(), + }, + Spec: api.ReplicationControllerSpec{ + Replicas: size, + Selector: fooBar(), + }, + } + + rcName, err := controller.KeyFunc(rc) + if err != nil { + t.Fatalf("Unexpected error naming RC %q", rc.Name) + } + + return rc, rcName +} + +func newDeployment(t *testing.T, size int32) (*extensions.Deployment, string) { + d := &extensions.Deployment{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + Labels: fooBar(), + }, + Spec: extensions.DeploymentSpec{ + Replicas: size, + Selector: newSelFooBar(), + }, + } + + dName, err := controller.KeyFunc(d) + if err != nil { + t.Fatalf("Unexpected error naming Deployment %q: %v", d.Name, err) + } + + return d, dName +} + +func newReplicaSet(t *testing.T, size int32) (*extensions.ReplicaSet, string) { + rs := &extensions.ReplicaSet{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + Labels: fooBar(), + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: size, + Selector: newSelFooBar(), + }, + } + + rsName, err := controller.KeyFunc(rs) + if err != nil { + t.Fatalf("Unexpected error naming ReplicaSet %q: %v", rs.Name, err) + } + + return rs, rsName +} + +func update(t *testing.T, store cache.Store, obj interface{}) { + if err := store.Update(obj); err != nil { + t.Fatalf("Could not add %+v to %+v: %v", obj, store, err) + } +} + +func add(t *testing.T, store cache.Store, obj interface{}) { + if err := store.Add(obj); err != nil { + t.Fatalf("Could not add %+v to %+v: %v", obj, store, err) + } +} + +// Create one with no selector. Verify it matches 0 pods. +func TestNoSelector(t *testing.T) { + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3)) + pdb.Spec.Selector = &unversioned.LabelSelector{} + pod, _ := newPod(t, "yo-yo-yo") + + add(t, dc.pdbLister.Store, pdb) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, false, 0, 3, 0) + + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, false, 0, 3, 0) +} + +// Verify that available/expected counts go up as we add pods, then verify that +// available count goes down when we make a pod unavailable. +func TestUnavailable(t *testing.T) { + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3)) + add(t, dc.pdbLister.Store, pdb) + dc.sync(pdbName) + + // Add three pods, verifying that the counts go up at each step. + pods := []*api.Pod{} + for i := int32(0); i < 3; i++ { + ps.VerifyPdbStatus(t, pdbName, false, i, 3, i) + pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) + pods = append(pods, pod) + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + } + ps.VerifyPdbStatus(t, pdbName, true, 3, 3, 3) + + // Now set one pod as unavailable + pods[0].Status.Conditions = []api.PodCondition{} + update(t, dc.podLister.Indexer, pods[0]) + dc.sync(pdbName) + + // Verify expected update + ps.VerifyPdbStatus(t, pdbName, false, 2, 3, 3) +} + +// Create a pod with no controller, and verify that a PDB with a percentage +// specified won't allow a disruption. +func TestNakedPod(t *testing.T) { + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) + add(t, dc.pdbLister.Store, pdb) + dc.sync(pdbName) + // This verifies that when a PDB has 0 pods, disruptions are not allowed. + ps.VerifyDisruptionAllowed(t, pdbName, false) + + pod, _ := newPod(t, "naked") + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + + ps.VerifyDisruptionAllowed(t, pdbName, false) +} + +// Verify that we count the scale of a ReplicaSet even when it has no Deployment. +func TestReplicaSet(t *testing.T) { + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%")) + add(t, dc.pdbLister.Store, pdb) + + rs, _ := newReplicaSet(t, 10) + add(t, dc.rsLister.Store, rs) + + pod, _ := newPod(t, "pod") + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, false, 1, 2, 10) +} + +// Verify that multiple controllers doesn't allow the PDB to be set true. +func TestMultipleControllers(t *testing.T) { + const rcCount = 2 + const podCount = 2 + + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%")) + add(t, dc.pdbLister.Store, pdb) + + for i := 0; i < podCount; i++ { + pod, _ := newPod(t, fmt.Sprintf("pod %d", i)) + add(t, dc.podLister.Indexer, pod) + } + dc.sync(pdbName) + + // No controllers yet => no disruption allowed + ps.VerifyDisruptionAllowed(t, pdbName, false) + + rc, _ := newReplicationController(t, 1) + rc.Name = "rc 1" + add(t, dc.rcLister.Indexer, rc) + dc.sync(pdbName) + + // One RC and 200%>1% healthy => disruption allowed + ps.VerifyDisruptionAllowed(t, pdbName, true) + + rc, _ = newReplicationController(t, 1) + rc.Name = "rc 2" + add(t, dc.rcLister.Indexer, rc) + dc.sync(pdbName) + + // 100%>1% healthy BUT two RCs => no disruption allowed + ps.VerifyDisruptionAllowed(t, pdbName, false) +} + +func TestReplicationController(t *testing.T) { + // The budget in this test matches foo=bar, but the RC and its pods match + // {foo=bar, baz=quux}. Later, when we add a rogue pod with only a foo=bar + // label, it will match the budget but have no controllers, which should + // trigger the controller to set PodDisruptionAllowed to false. + labels := map[string]string{ + "foo": "bar", + "baz": "quux", + } + + dc, ps := newFakeDisruptionController() + + // 67% should round up to 3 + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("67%")) + add(t, dc.pdbLister.Store, pdb) + rc, _ := newReplicationController(t, 3) + rc.Spec.Selector = labels + add(t, dc.rcLister.Indexer, rc) + dc.sync(pdbName) + // It starts out at 0 expected because, with no pods, the PDB doesn't know + // about the RC. This is a known bug. TODO(mml): file issue + ps.VerifyPdbStatus(t, pdbName, false, 0, 0, 0) + + pods := []*api.Pod{} + + for i := int32(0); i < 3; i++ { + pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) + pods = append(pods, pod) + pod.Labels = labels + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + if i < 2 { + ps.VerifyPdbStatus(t, pdbName, false, i+1, 3, 3) + } else { + ps.VerifyPdbStatus(t, pdbName, true, 3, 3, 3) + } + } + + rogue, _ := newPod(t, "rogue") + add(t, dc.podLister.Indexer, rogue) + dc.sync(pdbName) + ps.VerifyDisruptionAllowed(t, pdbName, false) +} + +func TestTwoControllers(t *testing.T) { + // Most of this test is in verifying intermediate cases as we define the + // three controllers and create the pods. + rcLabels := map[string]string{ + "foo": "bar", + "baz": "quux", + } + dLabels := map[string]string{ + "foo": "bar", + "baz": "quuux", + } + dc, ps := newFakeDisruptionController() + + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) + add(t, dc.pdbLister.Store, pdb) + rc, _ := newReplicationController(t, 11) + rc.Spec.Selector = rcLabels + add(t, dc.rcLister.Indexer, rc) + dc.sync(pdbName) + + ps.VerifyPdbStatus(t, pdbName, false, 0, 0, 0) + + pods := []*api.Pod{} + + for i := int32(0); i < 11; i++ { + pod, _ := newPod(t, fmt.Sprintf("quux %d", i)) + pods = append(pods, pod) + pod.Labels = rcLabels + if i < 7 { + pod.Status.Conditions = []api.PodCondition{} + } + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + if i < 7 { + ps.VerifyPdbStatus(t, pdbName, false, 0, 4, 11) + } else if i < 10 { + ps.VerifyPdbStatus(t, pdbName, false, i-6, 4, 11) + } else { + ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11) + } + } + + d, _ := newDeployment(t, 11) + d.Spec.Selector = newSel(dLabels) + add(t, dc.dLister.Store, d) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11) + + rs, _ := newReplicaSet(t, 11) + rs.Spec.Selector = newSel(dLabels) + rs.Labels = dLabels + add(t, dc.rsLister.Store, rs) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11) + + for i := int32(0); i < 11; i++ { + pod, _ := newPod(t, fmt.Sprintf("quuux %d", i)) + pods = append(pods, pod) + pod.Labels = dLabels + if i < 7 { + pod.Status.Conditions = []api.PodCondition{} + } + add(t, dc.podLister.Indexer, pod) + dc.sync(pdbName) + if i < 7 { + ps.VerifyPdbStatus(t, pdbName, false, 4, 7, 22) + } else if i < 9 { + ps.VerifyPdbStatus(t, pdbName, false, 4+i-6, 7, 22) + } else { + ps.VerifyPdbStatus(t, pdbName, true, 4+i-6, 7, 22) + } + } + + // Now we verify we can bring down 1 pod and a disruption is still permitted, + // but if we bring down two, it's not. Then we make the pod ready again and + // verify that a disruption is permitted again. + ps.VerifyPdbStatus(t, pdbName, true, 8, 7, 22) + pods[10].Status.Conditions = []api.PodCondition{} + update(t, dc.podLister.Indexer, pods[10]) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22) + + pods[9].Status.Conditions = []api.PodCondition{} + update(t, dc.podLister.Indexer, pods[9]) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, false, 6, 7, 22) + + pods[10].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} + update(t, dc.podLister.Indexer, pods[10]) + dc.sync(pdbName) + ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22) +} diff --git a/test/e2e/disruption.go b/test/e2e/disruption.go new file mode 100644 index 00000000000..e01e34c4bbd --- /dev/null +++ b/test/e2e/disruption.go @@ -0,0 +1,104 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/policy" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" +) + +var _ = framework.KubeDescribe("DisruptionController [Feature:PodDisruptionbudget]", func() { + f := framework.NewDefaultFramework("disruption") + var ns string + var c *client.Client + + BeforeEach(func() { + c = f.Client + ns = f.Namespace.Name + }) + + It("should create a PodDisruptionBudget", func() { + pdb := policy.PodDisruptionBudget{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + }, + Spec: policy.PodDisruptionBudgetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + MinAvailable: intstr.FromString("1%"), + }, + } + _, err := c.Policy().PodDisruptionBudgets(ns).Create(&pdb) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should update PodDisruptionBudget status", func() { + pdb := policy.PodDisruptionBudget{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + }, + Spec: policy.PodDisruptionBudgetSpec{ + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + MinAvailable: intstr.FromInt(2), + }, + } + _, err := c.Policy().PodDisruptionBudgets(ns).Create(&pdb) + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 2; i++ { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + Namespace: ns, + Labels: map[string]string{"foo": "bar"}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "busybox", + Image: "gcr.io/google_containers/echoserver:1.4", + }, + }, + RestartPolicy: api.RestartPolicyAlways, + }, + } + + _, err := c.Pods(ns).Create(pod) + framework.ExpectNoError(err, "Creating pod %q in namespace %q", pod.Name, ns) + } + err = wait.PollImmediate(framework.Poll, 60*time.Second, func() (bool, error) { + pdb, err := c.Policy().PodDisruptionBudgets(ns).Get("foo") + if err != nil { + return false, err + } + return pdb.Status.PodDisruptionAllowed, nil + }) + Expect(err).NotTo(HaveOccurred()) + + }) + +})