diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 90c4ea1aec8..c6664a4eaa9 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -806,6 +806,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( node *v1.Node, nodeToDaemonPods map[string][]*v1.Pod, ds *apps.DaemonSet, + hash string, ) (nodesNeedingDaemonPods, podsToDelete []string, err error) { shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) @@ -853,14 +854,60 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( daemonPodsRunning = append(daemonPodsRunning, pod) } } - // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. - // Sort the daemon pods by creation time, so the oldest is preserved. - if len(daemonPodsRunning) > 1 { + + // When surge is not enabled, if there is more than 1 running pod on a node delete all but the oldest + if !util.AllowsSurge(ds) { + if len(daemonPodsRunning) <= 1 { + // There are no excess pods to be pruned, and no pods to create + break + } + sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) for i := 1; i < len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } + break } + + if len(daemonPodsRunning) <= 1 { + // // There are no excess pods to be pruned + if len(daemonPodsRunning) == 0 && shouldRun { + // We are surging so we need to have at least one non-deleted pod on the node + nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) + } + break + } + + // When surge is enabled, we allow 2 pods if and only if the oldest pod matching the current hash state + // is not ready AND the oldest pod that doesn't match the current hash state is ready. All other pods are + // deleted. If neither pod is ready, only the one matching the current hash revision is kept. + var oldestNewPod, oldestOldPod *v1.Pod + sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) + for _, pod := range daemonPodsRunning { + if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash { + if oldestNewPod == nil { + oldestNewPod = pod + continue + } + } else { + if oldestOldPod == nil { + oldestOldPod = pod + continue + } + } + podsToDelete = append(podsToDelete, pod.Name) + } + if oldestNewPod != nil && oldestOldPod != nil { + switch { + case !podutil.IsPodReady(oldestOldPod): + klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name) + podsToDelete = append(podsToDelete, oldestOldPod.Name) + case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}): + klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name) + podsToDelete = append(podsToDelete, oldestOldPod.Name) + } + } + case !shouldContinueRunning && exists: // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. for _, pod := range daemonPods { @@ -890,9 +937,10 @@ func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, var nodesNeedingDaemonPods, podsToDelete []string for _, node := range nodeList { nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode( - node, nodeToDaemonPods, ds) + node, nodeToDaemonPods, ds, hash) if err != nil { + klog.V(0).Infof("DEBUG: sync of node %s for ds %s failed: %v", node.Name, ds.Name, err) continue } @@ -1074,6 +1122,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL } var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int + now := dsc.failedPodsBackoff.Clock.Now() for _, node := range nodeList { shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { @@ -1092,7 +1141,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL pod := daemonPods[0] if podutil.IsPodReady(pod) { numberReady++ - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) { + if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { numberAvailable++ } } @@ -1127,9 +1176,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL } func (dsc *DaemonSetsController) syncDaemonSet(key string) error { - startTime := time.Now() + startTime := dsc.failedPodsBackoff.Clock.Now() + defer func() { - klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime)) + klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 2cd4e33146e..d480b5d82b4 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -33,9 +33,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage/names" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -43,10 +45,14 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/daemon/util" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/securitycontext" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -429,6 +435,39 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae return } manager.expectations.DeleteExpectations(key) + + now := manager.failedPodsBackoff.Clock.Now() + hash, _ := currentDSHash(manager, ds) + // log all the pods in the store + var lines []string + for _, obj := range manager.podStore.List() { + pod := obj.(*v1.Pod) + if pod.CreationTimestamp.IsZero() { + pod.CreationTimestamp.Time = now + } + var readyLast time.Time + ready := podutil.IsPodReady(pod) + if ready { + if c := podutil.GetPodReadyCondition(pod.Status); c != nil { + readyLast = c.LastTransitionTime.Time.Add(time.Duration(ds.Spec.MinReadySeconds) * time.Second) + } + } + nodeName, _ := util.GetTargetNodeName(pod) + + lines = append(lines, fmt.Sprintf("node=%s current=%-5t ready=%-5t age=%-4d pod=%s now=%d available=%d", + nodeName, + hash == pod.Labels[apps.ControllerRevisionHashLabelKey], + ready, + now.Unix(), + pod.Name, + pod.CreationTimestamp.Unix(), + readyLast.Unix(), + )) + } + sort.Strings(lines) + for _, line := range lines { + klog.Info(line) + } } func TestDeleteFinalStateUnknown(t *testing.T) { @@ -3042,3 +3081,237 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string { sort.Strings(keys) return keys } + +// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. +func TestSurgeDealsWithExistingPods(t *testing.T) { + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2) + addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2) + expectSyncDaemonSets(t, manager, ds, podControl, 2, 5, 0) +} + +func TestSurgePreservesReadyOldPods(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it's the current hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + manager.podStore.Add(pod) + + // will be preserved because it's the oldest AND it is ready + pod = newPod("node-1-old-", "node-1", simpleDaemonSetLabel, ds) + delete(pod.Labels, apps.ControllerRevisionHashLabelKey) + pod.CreationTimestamp.Time = time.Unix(50, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(pod) + + // will be deleted because it's not the oldest, even though it is ready + oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it has the newest hash, and is also consuming the surge budget + pod := newPod("node-0-", "node-0", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}} + manager.podStore.Add(pod) + + // will be preserved because it is ready + oldPodReady := newPod("node-0-old-ready-", "node-0", simpleDaemonSetLabel, ds) + delete(oldPodReady.Labels, apps.ControllerRevisionHashLabelKey) + oldPodReady.CreationTimestamp.Time = time.Unix(50, 0) + oldPodReady.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldPodReady) + + // create old ready pods on all other nodes + for i := 1; i < 5; i++ { + oldPod := newPod(fmt.Sprintf("node-%d-preserve-", i), fmt.Sprintf("node-%d", i), simpleDaemonSetLabel, ds) + delete(oldPod.Labels, apps.ControllerRevisionHashLabelKey) + oldPod.CreationTimestamp.Time = time.Unix(1, 0) + oldPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldPod) + + // mark the last old pod as deleted, which should trigger a creation above surge + if i == 4 { + thirty := int64(30) + timestamp := metav1.Time{Time: time.Unix(1+thirty, 0)} + oldPod.DeletionGracePeriodSeconds = &thirty + oldPod.DeletionTimestamp = ×tamp + } + } + + // controller should detect that node-4 has only a deleted pod + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) + clearExpectations(t, manager, ds, podControl) +} + +func TestSurgeDeletesUnreadyOldPods(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + manager.podStore.Add(pod) + + // will be deleted because it is unready + oldUnreadyPod := newPod("node-1-old-unready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldUnreadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldUnreadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldUnreadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}} + manager.podStore.Add(oldUnreadyPod) + + // will be deleted because it is not the oldest + oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldReadyPod.Name, oldUnreadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.MinReadySeconds = 15 + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // the clock will be set 10s after the newest pod on node-1 went ready, which is not long enough to be available + manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+10, 0)) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}} + manager.podStore.Add(pod) + + // will be preserved because it is ready AND the newest pod is not yet available for long enough + oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + // will be deleted because it is not the oldest + oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldExcessReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldExcessReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgeDeletesOldReadyWithUnsatisfiedMinReady(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.MinReadySeconds = 15 + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // the clock will be set 20s after the newest pod on node-1 went ready, which is not long enough to be available + manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+20, 0)) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}} + manager.podStore.Add(pod) + + // will be preserved because it is ready AND the newest pod is not yet available for long enough + oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + // will be deleted because it is not the oldest + oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldExcessReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldExcessReadyPod.Name, oldReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index c8a67c35bca..11d1fcff5c8 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -26,12 +26,11 @@ import ( "k8s.io/klog/v2" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/json" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" @@ -46,37 +45,146 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) - maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) + maxSurge, maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } - oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) - // for oldPods delete all not running pods + now := dsc.failedPodsBackoff.Clock.Now() + + // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any + // are necessary, and let the core loop create new instances on those nodes. + if maxSurge == 0 { + _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) + oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods, now) + + var oldPodsToDelete []string + klog.V(4).Infof("Marking all unavailable old pods for deletion") + for _, pod := range oldUnavailablePods { + // Skip terminating pods. We won't delete them again + if pod.DeletionTimestamp != nil { + continue + } + klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) + } + for _, pod := range oldAvailablePods { + if numUnavailable >= maxUnavailable { + klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) + break + } + + klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) + oldPodsToDelete = append(oldPodsToDelete, pod.Name) + numUnavailable++ + } + return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) + } + + // When surging, we create new pods whenever an old pod is unavailable, and we can create up + // to maxSurge extra pods + // + // Assumptions: + // * Expect manage loop to allow no more than two pods per node, one old, one new + // * Expect manage loop will create new pods if there are no pods on node + // * Expect manage loop will handle failed pods + // * Deleted pods do not count as unavailable so that updates make progress when nodes are down + // Invariants: + // * A node with an unavailable old pod is a candidate for immediate new pod creation + // * An old available pod is deleted if a new pod is available + // * No more than maxSurge new pods are created for old available pods at any one time + // var oldPodsToDelete []string - klog.V(4).Infof("Marking all unavailable old pods for deletion") - for _, pod := range oldUnavailablePods { - // Skip terminating pods. We won't delete them again + var candidateNewNodes []string + var allowedNewNodes []string + var numSurge int + + for nodeName, pods := range nodeToDaemonPods { + newPod, oldPod, ok := findSurgePodsOnNode(ds, pods, hash) + if !ok { + // let the manage loop clean up this node, and treat it as a surge node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numSurge++ + continue + } + switch { + case oldPod == nil: + // we don't need to do anything to this node, the manage loop will handle it + case newPod == nil: + // this is a surge candidate + switch { + case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): + // the old pod isn't available, allow it to become a replacement + klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the replacement + if allowedNewNodes == nil { + allowedNewNodes = make([]string, 0, len(nodeList)) + } + allowedNewNodes = append(allowedNewNodes, nodeName) + case numSurge >= maxSurge: + // no point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a surge candidate", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the candidate + if candidateNewNodes == nil { + candidateNewNodes = make([]string, 0, maxSurge) + } + candidateNewNodes = append(candidateNewNodes, nodeName) + } + default: + // we have already surged onto this node, determine our state + if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { + // we're waiting to go available here + numSurge++ + continue + } + // we're available, delete the old pod + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name) + oldPodsToDelete = append(oldPodsToDelete, oldPod.Name) + } + } + + // use any of the candidates we can, including the allowedNewNodes + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes)) + remainingSurge := maxSurge - numSurge + if remainingSurge < 0 { + remainingSurge = 0 + } + if max := len(candidateNewNodes); remainingSurge > max { + remainingSurge = max + } + newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...) + + return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) +} + +// findSurgePodsOnNode looks at non-deleted pods on a given node and returns true if there +// is at most one of each old and new pods, or false if there are multiples. We can skip +// processing the particular node in those scenarios and let the manage loop prune the +// excess pods for our next time around. +func findSurgePodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { + for _, pod := range podsOnNode { if pod.DeletionTimestamp != nil { continue } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - } - - klog.V(4).Infof("Marking old pods for deletion") - for _, pod := range oldAvailablePods { - if numUnavailable >= maxUnavailable { - klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) - break + generation, err := util.GetTemplateGeneration(ds) + if err != nil { + generation = nil + } + if util.IsPodUpdated(pod, hash, generation) { + if newPod != nil { + return nil, nil, false + } + newPod = pod + } else { + if oldPod != nil { + return nil, nil, false + } + oldPod = pod } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - numUnavailable++ } - return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) + return newPod, oldPod, true } // constructHistory finds all histories controlled by the given DaemonSet, and @@ -385,14 +493,18 @@ func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToD return newPods, oldPods } -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { +// getUnavailableNumbers calculates the true number of allowed unavailable or surge pods. +// TODO: This method duplicates calculations in the main update loop and should be refactored +// to remove the need to calculate availability twice (once here, and once in the main loops) +func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) { klog.V(4).Infof("Getting unavailable numbers") + now := dsc.failedPodsBackoff.Clock.Now() var numUnavailable, desiredNumberScheduled int for i := range nodeList { node := nodeList[i] wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { - return -1, -1, err + return -1, -1, -1, err } if !wantToRun { continue @@ -405,8 +517,8 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL } available := false for _, pod := range daemonPods { - //for the purposes of update we ensure that the Pod is both available and not terminating - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) && pod.DeletionTimestamp == nil { + // for the purposes of update we ensure that the Pod is both available and not terminating + if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) && pod.DeletionTimestamp == nil { available = true break } @@ -415,12 +527,25 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL numUnavailable++ } } - maxUnavailable, err := intstrutil.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desiredNumberScheduled, true) + + maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled) if err != nil { - return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) + return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) } - klog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable) - return maxUnavailable, numUnavailable, nil + + maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled) + if err != nil { + return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) + } + + // if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the + // event the apiserver returns 0 for both surge and unavailability) + if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 { + klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name) + maxUnavailable = 1 + } + klog.V(0).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable, numUnavailable) + return maxSurge, maxUnavailable, numUnavailable, nil } type historiesByRevision []*apps.ControllerRevision diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index 4bdf3e2e4f4..d6bafba8760 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -18,12 +18,20 @@ package daemon import ( "testing" + "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/daemon/util" + "k8s.io/kubernetes/pkg/features" ) func TestDaemonSetUpdatesPods(t *testing.T) { @@ -67,6 +75,48 @@ func TestDaemonSetUpdatesPods(t *testing.T) { clearExpectations(t, manager, ds, podControl) } +func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 5, nil) + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + markPodsReady(podControl.podStore) + + // surge is thhe controlling amount + maxSurge := 2 + ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(maxSurge)) + manager.dsStore.Update(ds) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, 0, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 5%maxSurge, maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 5%maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) +} + func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { ds := newDaemonSet("foo") manager, podControl, _, err := newTestController(ds) @@ -138,6 +188,149 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) { clearExpectations(t, manager, ds, podControl) } +func TestDaemonSetUpdatesAllOldPodsNotReadyMaxSurge(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 5, nil) + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + + maxSurge := 3 + ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(maxSurge)) + manager.dsStore.Update(ds) + + // all old pods are unavailable so should be surged + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(100, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + + // waiting for pods to go ready, old pods are deleted + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(200, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 5, 0) + + setPodReadiness(t, manager, true, 5, func(_ *v1.Pod) bool { return true }) + ds.Spec.MinReadySeconds = 15 + ds.Spec.Template.Spec.Containers[0].Image = "foo3/bar3" + manager.dsStore.Update(ds) + + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(300, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 0, 0) + + hash, err := currentDSHash(manager, ds) + if err != nil { + t.Fatal(err) + } + currentPods := podsByNodeMatchingHash(manager, hash) + // mark two updated pods as ready at time 300 + setPodReadiness(t, manager, true, 2, func(pod *v1.Pod) bool { + return pod.Labels[apps.ControllerRevisionHashLabelKey] == hash + }) + // mark one of the old pods that is on a node without an updated pod as unready + setPodReadiness(t, manager, false, 1, func(pod *v1.Pod) bool { + nodeName, err := util.GetTargetNodeName(pod) + if err != nil { + t.Fatal(err) + } + return pod.Labels[apps.ControllerRevisionHashLabelKey] != hash && len(currentPods[nodeName]) == 0 + }) + + // the new pods should still be considered waiting to hit min readiness, so one pod should be created to replace + // the deleted old pod + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(310, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) + + // the new pods are now considered available, so delete the old pods + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(320, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 3, 0) + + // mark all updated pods as ready at time 320 + currentPods = podsByNodeMatchingHash(manager, hash) + setPodReadiness(t, manager, true, 3, func(pod *v1.Pod) bool { + return pod.Labels[apps.ControllerRevisionHashLabelKey] == hash + }) + + // the new pods are now considered available, so delete the old pods + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(340, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 2, 0) + + // controller has completed upgrade + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(350, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) +} + +func podsByNodeMatchingHash(dsc *daemonSetsController, hash string) map[string][]string { + byNode := make(map[string][]string) + for _, obj := range dsc.podStore.List() { + pod := obj.(*v1.Pod) + if pod.Labels[apps.ControllerRevisionHashLabelKey] != hash { + continue + } + nodeName, err := util.GetTargetNodeName(pod) + if err != nil { + panic(err) + } + byNode[nodeName] = append(byNode[nodeName], pod.Name) + } + return byNode +} + +func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count int, fn func(*v1.Pod) bool) { + t.Helper() + for _, obj := range dsc.podStore.List() { + if count <= 0 { + break + } + pod := obj.(*v1.Pod) + if pod.DeletionTimestamp != nil { + continue + } + if podutil.IsPodReady(pod) == ready { + continue + } + if !fn(pod) { + continue + } + condition := v1.PodCondition{Type: v1.PodReady} + if ready { + condition.Status = v1.ConditionTrue + } else { + condition.Status = v1.ConditionFalse + } + if !podutil.UpdatePodCondition(&pod.Status, &condition) { + t.Fatal("failed to update pod") + } + // TODO: workaround UpdatePodCondition calling time.Now() directly + setCondition := podutil.GetPodReadyCondition(pod.Status) + setCondition.LastTransitionTime.Time = dsc.failedPodsBackoff.Clock.Now() + klog.Infof("marked pod %s ready=%t", pod.Name, ready) + count-- + } + if count > 0 { + t.Fatalf("could not mark %d pods ready=%t", count, ready) + } +} + +func currentDSHash(dsc *daemonSetsController, ds *apps.DaemonSet) (string, error) { + // Construct histories of the DaemonSet, and get the hash of current history + cur, _, err := dsc.constructHistory(ds) + if err != nil { + return "", err + } + return cur.Labels[apps.DefaultDaemonSetUniqueLabelKey], nil + +} + func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { ds := newDaemonSet("foo") manager, podControl, _, err := newTestController(ds) @@ -163,12 +356,34 @@ func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { clearExpectations(t, manager, ds, podControl) } +func newUpdateSurge(value intstr.IntOrString) apps.DaemonSetUpdateStrategy { + zero := intstr.FromInt(0) + return apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &zero, + MaxSurge: &value, + }, + } +} + +func newUpdateUnavailable(value intstr.IntOrString) apps.DaemonSetUpdateStrategy { + return apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &value, + }, + } +} + func TestGetUnavailableNumbers(t *testing.T) { cases := []struct { name string Manager *daemonSetsController ds *apps.DaemonSet nodeToPods map[string][]*v1.Pod + enableSurge bool + maxSurge int maxUnavailable int numUnavailable int Err error @@ -184,8 +399,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(0) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0)) return ds }(), nodeToPods: make(map[string][]*v1.Pod), @@ -204,8 +418,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(1) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(1)) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -233,8 +446,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(0) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0)) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -244,7 +456,32 @@ func TestGetUnavailableNumbers(t *testing.T) { mapping["node-0"] = []*v1.Pod{pod0} return mapping }(), - maxUnavailable: 0, + maxUnavailable: 1, + numUnavailable: 1, + }, + { + name: "Two nodes, one node without pods, surge", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 2, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(0)) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + markPodReady(pod0) + mapping["node-0"] = []*v1.Pod{pod0} + return mapping + }(), + maxUnavailable: 1, numUnavailable: 1, }, { @@ -259,8 +496,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromString("50%") - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%")) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -276,6 +512,66 @@ func TestGetUnavailableNumbers(t *testing.T) { maxUnavailable: 1, numUnavailable: 0, }, + { + name: "Two nodes with pods, MaxUnavailable in percents, surge", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 2, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("50%")) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + pod1 := newPod("pod-1", "node-1", simpleDaemonSetLabel, nil) + markPodReady(pod0) + markPodReady(pod1) + mapping["node-0"] = []*v1.Pod{pod0} + mapping["node-1"] = []*v1.Pod{pod1} + return mapping + }(), + enableSurge: true, + maxSurge: 1, + maxUnavailable: 0, + numUnavailable: 0, + }, + { + name: "Two nodes with pods, MaxUnavailable is 100%, surge", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 2, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("100%")) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + pod1 := newPod("pod-1", "node-1", simpleDaemonSetLabel, nil) + markPodReady(pod0) + markPodReady(pod1) + mapping["node-0"] = []*v1.Pod{pod0} + mapping["node-1"] = []*v1.Pod{pod1} + return mapping + }(), + enableSurge: true, + maxSurge: 2, + maxUnavailable: 0, + numUnavailable: 0, + }, { name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", Manager: func() *daemonSetsController { @@ -288,8 +584,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromString("50%") - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%")) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -310,20 +605,26 @@ func TestGetUnavailableNumbers(t *testing.T) { } for _, c := range cases { - c.Manager.dsStore.Add(c.ds) - nodeList, err := c.Manager.nodeLister.List(labels.Everything()) - if err != nil { - t.Fatalf("error listing nodes: %v", err) - } - maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods) - if err != nil && c.Err != nil { - if c.Err != err { - t.Errorf("Test case: %s. Expected error: %v but got: %v", c.name, c.Err, err) + t.Run(c.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, c.enableSurge)() + + c.Manager.dsStore.Add(c.ds) + nodeList, err := c.Manager.nodeLister.List(labels.Everything()) + if err != nil { + t.Fatalf("error listing nodes: %v", err) } - } else if err != nil { - t.Errorf("Test case: %s. Unexpected error: %v", c.name, err) - } else if maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable { - t.Errorf("Test case: %s. Wrong values. maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", c.name, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable) - } + maxSurge, maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods) + if err != nil && c.Err != nil { + if c.Err != err { + t.Fatalf("Expected error: %v but got: %v", c.Err, err) + } + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable { + t.Fatalf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable) + } + }) } } diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index cd576770159..f6060406975 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -19,13 +19,17 @@ package util import ( "fmt" "strconv" + "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" ) // GetTemplateGeneration gets the template generation associated with a v1.DaemonSet by extracting it from the @@ -122,6 +126,43 @@ func CreatePodTemplate(template v1.PodTemplateSpec, generation *int64, hash stri return newTemplate } +// AllowsSurge returns true if the daemonset allows more than a single pod on any node. +func AllowsSurge(ds *apps.DaemonSet) bool { + maxSurge, err := SurgeCount(ds, 1) + return err == nil && maxSurge > 0 +} + +// SurgeCount returns 0 if surge is not requested, the expected surge number to allow +// out of numberToSchedule if surge is configured, or an error if the surge percentage +// requested is invalid. +func SurgeCount(ds *apps.DaemonSet, numberToSchedule int) (int, error) { + if ds.Spec.UpdateStrategy.Type != apps.RollingUpdateDaemonSetStrategyType { + return 0, nil + } + if !utilfeature.DefaultFeatureGate.Enabled(features.DaemonSetUpdateSurge) { + return 0, nil + } + r := ds.Spec.UpdateStrategy.RollingUpdate + if r == nil { + return 0, nil + } + return intstrutil.GetScaledValueFromIntOrPercent(r.MaxSurge, numberToSchedule, true) +} + +// UnavailableCount returns 0 if unavailability is not requested, the expected +// unavailability number to allow out of numberToSchedule if requested, or an error if +// the unavailability percentage requested is invalid. +func UnavailableCount(ds *apps.DaemonSet, numberToSchedule int) (int, error) { + if ds.Spec.UpdateStrategy.Type != apps.RollingUpdateDaemonSetStrategyType { + return 0, nil + } + r := ds.Spec.UpdateStrategy.RollingUpdate + if r == nil { + return 0, nil + } + return intstrutil.GetScaledValueFromIntOrPercent(r.MaxUnavailable, numberToSchedule, true) +} + // IsPodUpdated checks if pod contains label value that either matches templateGeneration or hash func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { // Compare with hash to see if the pod is updated, need to maintain backward compatibility of templateGeneration @@ -131,12 +172,12 @@ func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { return hashMatches || templateMatches } -// SplitByAvailablePods splits provided daemon set pods by availability -func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) { - unavailablePods := []*v1.Pod{} - availablePods := []*v1.Pod{} +// SplitByAvailablePods splits provided daemon set pods by availability. +func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod, now time.Time) ([]*v1.Pod, []*v1.Pod) { + availablePods := make([]*v1.Pod, 0, len(pods)) + var unavailablePods []*v1.Pod for _, pod := range pods { - if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now()) { + if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) { availablePods = append(availablePods, pod) } else { unavailablePods = append(unavailablePods, pod) diff --git a/test/e2e/apps/daemon_set.go b/test/e2e/apps/daemon_set.go index 329688eb3b9..db23bc1efd7 100644 --- a/test/e2e/apps/daemon_set.go +++ b/test/e2e/apps/daemon_set.go @@ -17,10 +17,14 @@ limitations under the License. package apps import ( + "bytes" "context" "fmt" + "math/rand" "reflect" + "sort" "strings" + "text/tabwriter" "time" "github.com/onsi/ginkgo" @@ -32,6 +36,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -482,8 +488,334 @@ var _ = SIGDescribe("Daemon set [Serial]", func() { framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name)) } }) + + // TODO: This test is expected to be promoted to conformance after the feature is promoted + ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate [Feature:DaemonSetUpdateSurge]", func() { + label := map[string]string{daemonsetNameLabel: dsName} + + framework.Logf("Creating surge daemon set %s", dsName) + maxSurgeOverlap := 60 * time.Second + maxSurge := 1 + surgePercent := intstr.FromString("20%") + zero := intstr.FromInt(0) + oldVersion := "1" + ds := newDaemonSet(dsName, image, label) + ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{ + {Name: "VERSION", Value: oldVersion}, + } + // delay shutdown by 15s to allow containers to overlap in time + ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{ + PreStop: &v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/sh", "-c", "sleep 15"}, + }, + }, + } + // use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready) + ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{ + Handler: v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`}, + }, + }, + InitialDelaySeconds: 7, + PeriodSeconds: 3, + SuccessThreshold: 1, + FailureThreshold: 1, + } + // use a simple surge strategy + ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &zero, + MaxSurge: &surgePercent, + }, + } + // The pod must be ready for at least 10s before we delete the old pod + ds.Spec.MinReadySeconds = 10 + + ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Check that daemon pods launch on every node of the cluster.") + err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) + framework.ExpectNoError(err, "error waiting for daemon pod to start") + + // Check history and labels + ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + waitForHistoryCreated(c, ns, label, 1) + cur := curHistory(listDaemonHistories(c, ns, label), ds) + hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] + framework.ExpectEqual(cur.Revision, int64(1)) + checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) + + newVersion := "2" + ginkgo.By("Update daemon pods environment var") + patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion) + ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + framework.ExpectNoError(err) + + // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. + // Get the number of nodes, and set the timeout appropriately. + nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + nodeCount := len(nodes.Items) + retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second + + ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout") + ageOfOldPod := make(map[string]time.Time) + deliberatelyDeletedPods := sets.NewString() + err = wait.PollImmediate(dsRetryPeriod, retryTimeout, func() (bool, error) { + podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + pods := podList.Items + + var buf bytes.Buffer + pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0) + fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n") + + now := time.Now() + podUIDs := sets.NewString() + deletedPodUIDs := sets.NewString() + nodes := sets.NewString() + versions := sets.NewString() + nodesToVersions := make(map[string]map[string]int) + nodesToDeletedVersions := make(map[string]map[string]int) + var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + nodeName := pod.Spec.NodeName + nodes.Insert(nodeName) + podVersion := pod.Spec.Containers[0].Env[0].Value + if pod.DeletionTimestamp != nil { + if !deliberatelyDeletedPods.Has(string(pod.UID)) { + versions := nodesToDeletedVersions[nodeName] + if versions == nil { + versions = make(map[string]int) + nodesToDeletedVersions[nodeName] = versions + } + versions[podVersion]++ + } + } else { + versions := nodesToVersions[nodeName] + if versions == nil { + versions = make(map[string]int) + nodesToVersions[nodeName] = versions + } + versions[podVersion]++ + } + + ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) + if podVersion == newVersion { + surgeCount++ + if !ready || pod.DeletionTimestamp != nil { + if deliberatelyDeletedPods.Has(string(pod.UID)) { + newDeliberatelyDeletedCount++ + } + newUnavailableCount++ + } + } else { + if !ready || pod.DeletionTimestamp != nil { + oldUnavailableCount++ + } + } + fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready) + } + + // print a stable sorted list of pods by node for debugging + pw.Flush() + lines := strings.Split(buf.String(), "\n") + lines = lines[:len(lines)-1] + sort.Strings(lines[1:]) + for _, line := range lines { + framework.Logf("%s", line) + } + + // if there is an old and new pod at the same time, record a timestamp + deletedPerNode := make(map[string]int) + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + // ignore deleted pods + if pod.DeletionTimestamp != nil { + deletedPodUIDs.Insert(string(pod.UID)) + if !deliberatelyDeletedPods.Has(string(pod.UID)) { + deletedPerNode[pod.Spec.NodeName]++ + } + continue + } + podUIDs.Insert(string(pod.UID)) + podVersion := pod.Spec.Containers[0].Env[0].Value + if podVersion == newVersion { + continue + } + // if this is a pod in an older version AND there is a new version of this pod, record when + // we started seeing this, otherwise delete the record (perhaps the node was drained) + if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 { + if _, ok := ageOfOldPod[string(pod.UID)]; !ok { + ageOfOldPod[string(pod.UID)] = now + } + } else { + delete(ageOfOldPod, string(pod.UID)) + } + } + // purge the old pods list of any deleted pods + for uid := range ageOfOldPod { + if !podUIDs.Has(uid) { + delete(ageOfOldPod, uid) + } + } + deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs) + + for _, versions := range nodesToVersions { + if versions[oldVersion] == 0 { + nodesWithoutOldVersion++ + } + } + + var errs []string + + // invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving + for node, count := range deletedPerNode { + if count > 1 { + errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count)) + } + } + + // invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period) + for uid, firstSeen := range ageOfOldPod { + if now.Sub(firstSeen) > maxSurgeOverlap { + errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap)) + } + } + + // invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or + // if we deliberately deleted one of the new pods + if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) { + errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion)) + } + // invariant: the total number of versions created should be 2 + if versions.Len() > 2 { + errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len())) + } + for _, node := range nodes.List() { + // ignore pods that haven't been scheduled yet + if len(node) == 0 { + continue + } + versionCount := make(map[string]int) + // invariant: surge should never have more than one instance of a pod per node running + for version, count := range nodesToVersions[node] { + if count > 1 { + errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version)) + } + versionCount[version] += count + } + // invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted) + for version, count := range nodesToDeletedVersions[node] { + if count > 2 { + errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version)) + } + versionCount[version] += count + } + // invariant: on any node, we should never have more than two instances of a version (if we are getting evicted) + for version, count := range versionCount { + if count > 2 { + errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version)) + } + } + } + + if len(errs) > 0 { + sort.Strings(errs) + return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n")) + } + + // Make sure every daemon pod on the node has been updated + nodeNames := schedulableNodes(c, ds) + for _, node := range nodeNames { + switch { + case + // if we don't have the new version yet + nodesToVersions[node][newVersion] == 0, + // if there are more than one version on a node + len(nodesToVersions[node]) > 1, + // if there are still any deleted pods + len(nodesToDeletedVersions[node]) > 0, + // if any of the new pods are unavailable + newUnavailableCount > 0: + + // inject a failure randomly to ensure the controller recovers + switch rand.Intn(25) { + // cause a random old pod to go unready + case 0: + // select a not-deleted pod of the old version + if pod := randomPod(pods, func(pod *v1.Pod) bool { + return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value + }); pod != nil { + // make the /tmp/ready file read only, which will cause readiness to fail + if _, err := framework.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil { + framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err) + } else { + framework.Logf("Marked old pod %s as unready", pod.Name) + } + } + case 1: + // delete a random pod + if pod := randomPod(pods, func(pod *v1.Pod) bool { + return pod.DeletionTimestamp == nil + }); pod != nil { + if err := c.CoreV1().Pods(ds.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { + framework.Logf("Failed to delete pod %s early: %v", pod.Name, err) + } else { + framework.Logf("Deleted pod %s prematurely", pod.Name) + deliberatelyDeletedPods.Insert(string(pod.UID)) + } + } + } + + // then wait + return false, nil + } + } + return true, nil + }) + framework.ExpectNoError(err) + + ginkgo.By("Check that daemon pods are still running on every node of the cluster.") + err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) + framework.ExpectNoError(err, "error waiting for daemon pod to start") + + // Check history and labels + ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + waitForHistoryCreated(c, ns, label, 2) + cur = curHistory(listDaemonHistories(c, ns, label), ds) + hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] + framework.ExpectEqual(cur.Revision, int64(2)) + checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) + }) }) +// randomPod selects a random pod within pods that causes fn to return true, or nil +// if no pod can be found matching the criteria. +func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod { + podCount := len(pods) + for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ { + pod := &pods[(offset+i)%podCount] + if fn(pod) { + return pod + } + } + return nil +} + // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image func getDaemonSetImagePatch(containerName, containerImage string) string { return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)