From fa432e131c9b97d5c5357145cccfd3057710f8d5 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Mon, 24 Jul 2017 20:08:35 +0800 Subject: [PATCH] Requeue DaemonSets if non-daemon pods were deleted. --- pkg/controller/daemon/BUILD | 1 + pkg/controller/daemon/daemon_controller.go | 105 ++++++++++- .../daemon/daemon_controller_test.go | 173 ++++++++++++++++++ 3 files changed, 273 insertions(+), 6 deletions(-) diff --git a/pkg/controller/daemon/BUILD b/pkg/controller/daemon/BUILD index ce90c100cef..f64a6d9e1bd 100644 --- a/pkg/controller/daemon/BUILD +++ b/pkg/controller/daemon/BUILD @@ -40,6 +40,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library", diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 61919a412e0..f104e163161 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" appsinformers "k8s.io/client-go/informers/apps/v1beta1" @@ -94,7 +95,8 @@ type DaemonSetsController struct { // To allow injection of syncDaemonSet for testing. syncHandler func(dsKey string) error // used for unit testing - enqueueDaemonSet func(ds *extensions.DaemonSet) + enqueueDaemonSet func(ds *extensions.DaemonSet) + enqueueDaemonSetRateLimited func(ds *extensions.DaemonSet) // A TTLCache of pod creates/deletes each ds expects to see expectations controller.ControllerExpectationsInterface // dsLister can list/get daemonsets from the shared informer's store @@ -120,6 +122,11 @@ type DaemonSetsController struct { // DaemonSet keys that need to be synced. queue workqueue.RateLimitingInterface + + // The DaemonSet that has suspended pods on nodes; the key is node name, the value + // is DaemonSet set that want to run pods but can't schedule in latest syncup cycle. + suspendedDaemonPodsMutex sync.Mutex + suspendedDaemonPods map[string]sets.String } func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController { @@ -141,9 +148,10 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo crControl: controller.RealControllerRevisionControl{ KubeClient: kubeClient, }, - burstReplicas: BurstReplicas, - expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), + burstReplicas: BurstReplicas, + expectations: controller.NewControllerExpectations(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), + suspendedDaemonPods: map[string]sets.String{}, } daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -191,6 +199,7 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue + dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited return dsc } @@ -267,6 +276,16 @@ func (dsc *DaemonSetsController) enqueue(ds *extensions.DaemonSet) { dsc.queue.Add(key) } +func (dsc *DaemonSetsController) enqueueRateLimited(ds *extensions.DaemonSet) { + key, err := controller.KeyFunc(ds) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) + return + } + + dsc.queue.AddRateLimited(key) +} + func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { @@ -519,6 +538,67 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { } } +// listSuspendedDaemonPods lists the Daemon pods that 'want to run, but should not schedule' +// for the node. +func (dsc *DaemonSetsController) listSuspendedDaemonPods(node string) (dss []string) { + dsc.suspendedDaemonPodsMutex.Lock() + defer dsc.suspendedDaemonPodsMutex.Unlock() + + if _, found := dsc.suspendedDaemonPods[node]; !found { + return nil + } + + for k := range dsc.suspendedDaemonPods[node] { + dss = append(dss, k) + } + return +} + +// requeueSuspendedDaemonPods enqueues all DaemonSets which has pods that 'want to run, +// but should not schedule' for the node; so DaemonSetController will sync up them again. +func (dsc *DaemonSetsController) requeueSuspendedDaemonPods(node string) { + dss := dsc.listSuspendedDaemonPods(node) + for _, dsKey := range dss { + if ns, name, err := cache.SplitMetaNamespaceKey(dsKey); err != nil { + glog.Errorf("Failed to get DaemonSet's namespace and name from %s: %v", dsKey, err) + continue + } else if ds, err := dsc.dsLister.DaemonSets(ns).Get(name); err != nil { + glog.Errorf("Failed to get DaemonSet %s/%s: %v", ns, name, err) + continue + } else { + dsc.enqueueDaemonSetRateLimited(ds) + } + } +} + +// addSuspendedDaemonPods adds DaemonSet which has pods that 'want to run, +// but should not schedule' for the node to the suspended queue. +func (dsc *DaemonSetsController) addSuspendedDaemonPods(node, ds string) { + dsc.suspendedDaemonPodsMutex.Lock() + defer dsc.suspendedDaemonPodsMutex.Unlock() + + if _, found := dsc.suspendedDaemonPods[node]; !found { + dsc.suspendedDaemonPods[node] = sets.NewString() + } + dsc.suspendedDaemonPods[node].Insert(ds) +} + +// removeSuspendedDaemonPods removes DaemonSet which has pods that 'want to run, +// but should not schedule' for the node from suspended queue. +func (dsc *DaemonSetsController) removeSuspendedDaemonPods(node, ds string) { + dsc.suspendedDaemonPodsMutex.Lock() + defer dsc.suspendedDaemonPodsMutex.Unlock() + + if _, found := dsc.suspendedDaemonPods[node]; !found { + return + } + dsc.suspendedDaemonPods[node].Delete(ds) + + if len(dsc.suspendedDaemonPods[node]) == 0 { + delete(dsc.suspendedDaemonPods, node) + } +} + func (dsc *DaemonSetsController) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -542,10 +622,18 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { // No controller should care about orphans being deleted. + if len(pod.Spec.NodeName) != 0 { + // If scheduled pods were deleted, requeue suspended daemon pods. + dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName) + } return } ds := dsc.resolveControllerRef(pod.Namespace, controllerRef) if ds == nil { + if len(pod.Spec.NodeName) != 0 { + // If scheduled pods were deleted, requeue suspended daemon pods. + dsc.requeueSuspendedDaemonPods(pod.Spec.NodeName) + } return } dsKey, err := controller.KeyFunc(ds) @@ -729,20 +817,25 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, hash string) e var nodesNeedingDaemonPods, podsToDelete []string var failedPodsObserved int for _, node := range nodeList { - _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) + wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { continue } daemonPods, exists := nodeToDaemonPods[node.Name] + dsKey, _ := cache.MetaNamespaceKeyFunc(ds) + dsc.removeSuspendedDaemonPods(node.Name, dsKey) switch { + case wantToRun && !shouldSchedule: + // If daemon pod is supposed to run, but can not be scheduled, add to suspended list. + dsc.addSuspendedDaemonPods(node.Name, dsKey) case shouldSchedule && !exists: // If daemon pod is supposed to be running on node, but isn't, create daemon pod. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) case shouldContinueRunning: // If a daemon pod failed, delete it - // If there's no daemon pods left on this node, we will create it in the next sync loop + // If there's non-daemon pods left on this node, we will create it in the next sync loop var daemonPodsRunning []*v1.Pod for _, pod := range daemonPods { if pod.Status.Phase == v1.PodFailed { diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 921cd6fc3dd..ad5f7acb1d7 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -1553,6 +1553,179 @@ func TestUpdateNode(t *testing.T) { } } +// DaemonSets should be resynced when non-daemon pods was deleted. +func TestDeleteNoDaemonPod(t *testing.T) { + var enqueued bool + + cases := []struct { + test string + node *v1.Node + existPods []*v1.Pod + deletedPod *v1.Pod + ds *extensions.DaemonSet + shouldEnqueue bool + }{ + { + test: "Deleted non-daemon pods to release resources", + node: func() *v1.Node { + node := newNode("node1", nil) + node.Status.Conditions = []v1.NodeCondition{ + {Type: v1.NodeReady, Status: v1.ConditionTrue}, + } + node.Status.Allocatable = allocatableResources("200M", "200m") + return node + }(), + existPods: func() []*v1.Pod { + pods := []*v1.Pod{} + for i := 0; i < 4; i++ { + podSpec := resourcePodSpec("node1", "50M", "50m") + pods = append(pods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod_%d", i), + }, + Spec: podSpec, + }) + } + return pods + }(), + deletedPod: func() *v1.Pod { + podSpec := resourcePodSpec("node1", "50M", "50m") + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod_0", + }, + Spec: podSpec, + } + }(), + ds: func() *extensions.DaemonSet { + ds := newDaemonSet("ds") + ds.Spec.Template.Spec = resourcePodSpec("", "50M", "50m") + return ds + }(), + shouldEnqueue: true, + }, + { + test: "Deleted non-daemon pods (with controller) to release resources", + node: func() *v1.Node { + node := newNode("node1", nil) + node.Status.Conditions = []v1.NodeCondition{ + {Type: v1.NodeReady, Status: v1.ConditionTrue}, + } + node.Status.Allocatable = allocatableResources("200M", "200m") + return node + }(), + existPods: func() []*v1.Pod { + pods := []*v1.Pod{} + for i := 0; i < 4; i++ { + podSpec := resourcePodSpec("node1", "50M", "50m") + pods = append(pods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod_%d", i), + OwnerReferences: []metav1.OwnerReference{ + {Controller: func() *bool { res := true; return &res }()}, + }, + }, + Spec: podSpec, + }) + } + return pods + }(), + deletedPod: func() *v1.Pod { + podSpec := resourcePodSpec("node1", "50M", "50m") + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod_0", + OwnerReferences: []metav1.OwnerReference{ + {Controller: func() *bool { res := true; return &res }()}, + }, + }, + Spec: podSpec, + } + }(), + ds: func() *extensions.DaemonSet { + ds := newDaemonSet("ds") + ds.Spec.Template.Spec = resourcePodSpec("", "50M", "50m") + return ds + }(), + shouldEnqueue: true, + }, + { + test: "Deleted no scheduled pods", + node: func() *v1.Node { + node := newNode("node1", nil) + node.Status.Conditions = []v1.NodeCondition{ + {Type: v1.NodeReady, Status: v1.ConditionTrue}, + } + node.Status.Allocatable = allocatableResources("200M", "200m") + return node + }(), + existPods: func() []*v1.Pod { + pods := []*v1.Pod{} + for i := 0; i < 4; i++ { + podSpec := resourcePodSpec("node1", "50M", "50m") + pods = append(pods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod_%d", i), + OwnerReferences: []metav1.OwnerReference{ + {Controller: func() *bool { res := true; return &res }()}, + }, + }, + Spec: podSpec, + }) + } + return pods + }(), + deletedPod: func() *v1.Pod { + podSpec := resourcePodSpec("", "50M", "50m") + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod_5", + }, + Spec: podSpec, + } + }(), + ds: func() *extensions.DaemonSet { + ds := newDaemonSet("ds") + ds.Spec.Template.Spec = resourcePodSpec("", "50M", "50m") + return ds + }(), + shouldEnqueue: false, + }, + } + + for _, c := range cases { + for _, strategy := range updateStrategies() { + manager, podControl, _ := newTestController() + manager.nodeStore.Add(c.node) + c.ds.Spec.UpdateStrategy = *strategy + manager.dsStore.Add(c.ds) + for _, pod := range c.existPods { + manager.podStore.Add(pod) + } + switch strategy.Type { + case extensions.OnDeleteDaemonSetStrategyType: + syncAndValidateDaemonSets(t, manager, c.ds, podControl, 0, 0, 2) + case extensions.RollingUpdateDaemonSetStrategyType: + syncAndValidateDaemonSets(t, manager, c.ds, podControl, 0, 0, 3) + default: + t.Fatalf("unexpected UpdateStrategy %+v", strategy) + } + + manager.enqueueDaemonSetRateLimited = func(ds *extensions.DaemonSet) { + if ds.Name == "ds" { + enqueued = true + } + } + + enqueued = false + manager.deletePod(c.deletedPod) + if enqueued != c.shouldEnqueue { + t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued) + } + } + } +} + func TestGetNodesToDaemonPods(t *testing.T) { for _, strategy := range updateStrategies() { ds := newDaemonSet("foo")