From 9f296c133dbed59c13c01135b1216a301a86aaca Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 27 Jan 2021 01:09:53 -0500 Subject: [PATCH] daemonset: Simplify the logic for calculating unavailable pods In order to maintain the correct invariants, the existing maxUnavailable logic calculated the same data several times in different ways. Leverage the simpler structure from maxSurge and calculate pod availability only once, as well as perform only a single pass over all the pods in the daemonset. This changed no behavior of the current controller, and has a structure that is almost identical to maxSurge. --- pkg/controller/daemon/update.go | 157 ++++++++++--------- pkg/controller/daemon/update_test.go | 42 ++--- pkg/controller/daemon/util/daemonset_util.go | 16 -- 3 files changed, 106 insertions(+), 109 deletions(-) diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index 11d1fcff5c8..0e3792ed34e 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -38,14 +38,14 @@ import ( labelsutil "k8s.io/kubernetes/pkg/util/labels" ) -// rollingUpdate deletes old daemon set pods making sure that no more than -// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable +// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, +// remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - maxSurge, maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) + maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } @@ -54,30 +54,74 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any // are necessary, and let the core loop create new instances on those nodes. + // + // Assumptions: + // * Expect manage loop to allow no more than one pod per node + // * Expect manage loop will create new pods + // * Expect manage loop will handle failed pods + // * Deleted pods do not count as unavailable so that updates make progress when nodes are down + // Invariants: + // * The number of new pods that are unavailable must be less than maxUnavailable + // * A node with an available old pod is a candidate for deletion if it does not violate other invariants + // if maxSurge == 0 { - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) - oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods, now) - - var oldPodsToDelete []string - klog.V(4).Infof("Marking all unavailable old pods for deletion") - for _, pod := range oldUnavailablePods { - // Skip terminating pods. We won't delete them again - if pod.DeletionTimestamp != nil { + var numUnavailable int + var allowedReplacementPods []string + var candidatePodsToDelete []string + for nodeName, pods := range nodeToDaemonPods { + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) + if !ok { + // let the manage loop clean up this node, and treat it as an unavailable node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numUnavailable++ continue } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - } - for _, pod := range oldAvailablePods { - if numUnavailable >= maxUnavailable { - klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) - break + switch { + case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: + // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable + numUnavailable++ + case newPod != nil: + // this pod is up to date, check its availability + if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { + // an unavailable new pod is counted against maxUnavailable + numUnavailable++ + } + default: + // this pod is old, it is an update candidate + switch { + case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): + // the old pod isn't available, so it needs to be replaced + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the replacement + if allowedReplacementPods == nil { + allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) + } + allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) + case numUnavailable >= maxUnavailable: + // no point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the candidate + if candidatePodsToDelete == nil { + candidatePodsToDelete = make([]string, 0, maxUnavailable) + } + candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) + } } - - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - numUnavailable++ } + + // use any of the candidates we can, including the allowedReplacemnntPods + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete)) + remainingUnavailable := maxUnavailable - numUnavailable + if remainingUnavailable < 0 { + remainingUnavailable = 0 + } + if max := len(candidatePodsToDelete); remainingUnavailable > max { + remainingUnavailable = max + } + oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) + return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) } @@ -100,7 +144,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v var numSurge int for nodeName, pods := range nodeToDaemonPods { - newPod, oldPod, ok := findSurgePodsOnNode(ds, pods, hash) + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) if !ok { // let the manage loop clean up this node, and treat it as a surge node klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) @@ -118,7 +162,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) // record the replacement if allowedNewNodes == nil { - allowedNewNodes = make([]string, 0, len(nodeList)) + allowedNewNodes = make([]string, 0, len(nodeToDaemonPods)) } allowedNewNodes = append(allowedNewNodes, nodeName) case numSurge >= maxSurge: @@ -159,11 +203,11 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) } -// findSurgePodsOnNode looks at non-deleted pods on a given node and returns true if there +// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there // is at most one of each old and new pods, or false if there are multiples. We can skip // processing the particular node in those scenarios and let the manage loop prune the // excess pods for our next time around. -func findSurgePodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { +func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { for _, pod := range podsOnNode { if pod.DeletionTimestamp != nil { continue @@ -471,71 +515,34 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* return history, err } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) { - var newPods []*v1.Pod - var oldPods []*v1.Pod - - for _, pods := range nodeToDaemonPods { - for _, pod := range pods { - // If the returned error is not nil we have a parse error. - // The controller handles this via the hash. - generation, err := util.GetTemplateGeneration(ds) - if err != nil { - generation = nil - } - if util.IsPodUpdated(pod, hash, generation) { - newPods = append(newPods, pod) - } else { - oldPods = append(oldPods, pod) - } - } - } - return newPods, oldPods -} - -// getUnavailableNumbers calculates the true number of allowed unavailable or surge pods. -// TODO: This method duplicates calculations in the main update loop and should be refactored -// to remove the need to calculate availability twice (once here, and once in the main loops) -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) { - klog.V(4).Infof("Getting unavailable numbers") - now := dsc.failedPodsBackoff.Clock.Now() - var numUnavailable, desiredNumberScheduled int +// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and +// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled. +func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { + var desiredNumberScheduled int for i := range nodeList { node := nodeList[i] wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { - return -1, -1, -1, err + return -1, -1, err } if !wantToRun { continue } desiredNumberScheduled++ - daemonPods, exists := nodeToDaemonPods[node.Name] - if !exists { - numUnavailable++ - continue - } - available := false - for _, pod := range daemonPods { - // for the purposes of update we ensure that the Pod is both available and not terminating - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) && pod.DeletionTimestamp == nil { - available = true - break - } - } - if !available { - numUnavailable++ + + if _, exists := nodeToDaemonPods[node.Name]; !exists { + nodeToDaemonPods[node.Name] = nil } } maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled) if err != nil { - return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) + return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) } maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled) if err != nil { - return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) + return -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) } // if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the @@ -544,8 +551,8 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name) maxUnavailable = 1 } - klog.V(0).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable, numUnavailable) - return maxSurge, maxUnavailable, numUnavailable, nil + klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable) + return maxSurge, maxUnavailable, nil } type historiesByRevision []*apps.ControllerRevision diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index d6bafba8760..5379f667d57 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -345,10 +345,7 @@ func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType intStr := intstr.FromInt(maxUnavailable) ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} - err = manager.dsStore.Update(ds) - if err != nil { - t.Fatal(err) - } + manager.dsStore.Update(ds) // template is not changed no pod should be removed clearExpectations(t, manager, ds, podControl) @@ -385,7 +382,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge bool maxSurge int maxUnavailable int - numUnavailable int + emptyNodes int Err error }{ { @@ -404,7 +401,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), nodeToPods: make(map[string][]*v1.Pod), maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with ready pods", @@ -432,7 +429,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes, one node without pods", @@ -457,7 +454,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 1, + emptyNodes: 1, }, { name: "Two nodes, one node without pods, surge", @@ -482,7 +479,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 1, + emptyNodes: 1, }, { name: "Two nodes with pods, MaxUnavailable in percents", @@ -510,7 +507,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable in percents, surge", @@ -540,7 +537,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge: true, maxSurge: 1, maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable is 100%, surge", @@ -570,7 +567,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge: true, maxSurge: 2, maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", @@ -579,7 +576,7 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("error creating DaemonSets controller: %v", err) } - addNodes(manager.nodeStore, 0, 2, nil) + addNodes(manager.nodeStore, 0, 3, nil) return manager }(), ds: func() *apps.DaemonSet { @@ -599,8 +596,8 @@ func TestGetUnavailableNumbers(t *testing.T) { mapping["node-1"] = []*v1.Pod{pod1} return mapping }(), - maxUnavailable: 1, - numUnavailable: 1, + maxUnavailable: 2, + emptyNodes: 1, }, } @@ -613,7 +610,7 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("error listing nodes: %v", err) } - maxSurge, maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods) + maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods) if err != nil && c.Err != nil { if c.Err != err { t.Fatalf("Expected error: %v but got: %v", c.Err, err) @@ -622,8 +619,17 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable { - t.Fatalf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable) + if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable { + t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable) + } + var emptyNodes int + for _, pods := range c.nodeToPods { + if len(pods) == 0 { + emptyNodes++ + } + } + if emptyNodes != c.emptyNodes { + t.Errorf("expected numEmpty to be %d, was %d", c.emptyNodes, emptyNodes) } }) } diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index f6060406975..f2b58237c20 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -19,7 +19,6 @@ package util import ( "fmt" "strconv" - "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -27,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" intstrutil "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" ) @@ -172,20 +170,6 @@ func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { return hashMatches || templateMatches } -// SplitByAvailablePods splits provided daemon set pods by availability. -func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod, now time.Time) ([]*v1.Pod, []*v1.Pod) { - availablePods := make([]*v1.Pod, 0, len(pods)) - var unavailablePods []*v1.Pod - for _, pod := range pods { - if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) { - availablePods = append(availablePods, pod) - } else { - unavailablePods = append(unavailablePods, pod) - } - } - return availablePods, unavailablePods -} - // ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution // NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName. // Note that this function assumes that no NodeAffinity conflicts with the selected nodeName.