diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index 11d1fcff5c8..0e3792ed34e 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -38,14 +38,14 @@ import ( labelsutil "k8s.io/kubernetes/pkg/util/labels" ) -// rollingUpdate deletes old daemon set pods making sure that no more than -// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable +// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, +// remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - maxSurge, maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) + maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } @@ -54,30 +54,74 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any // are necessary, and let the core loop create new instances on those nodes. + // + // Assumptions: + // * Expect manage loop to allow no more than one pod per node + // * Expect manage loop will create new pods + // * Expect manage loop will handle failed pods + // * Deleted pods do not count as unavailable so that updates make progress when nodes are down + // Invariants: + // * The number of new pods that are unavailable must be less than maxUnavailable + // * A node with an available old pod is a candidate for deletion if it does not violate other invariants + // if maxSurge == 0 { - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) - oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods, now) - - var oldPodsToDelete []string - klog.V(4).Infof("Marking all unavailable old pods for deletion") - for _, pod := range oldUnavailablePods { - // Skip terminating pods. We won't delete them again - if pod.DeletionTimestamp != nil { + var numUnavailable int + var allowedReplacementPods []string + var candidatePodsToDelete []string + for nodeName, pods := range nodeToDaemonPods { + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) + if !ok { + // let the manage loop clean up this node, and treat it as an unavailable node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numUnavailable++ continue } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - } - for _, pod := range oldAvailablePods { - if numUnavailable >= maxUnavailable { - klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) - break + switch { + case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: + // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable + numUnavailable++ + case newPod != nil: + // this pod is up to date, check its availability + if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { + // an unavailable new pod is counted against maxUnavailable + numUnavailable++ + } + default: + // this pod is old, it is an update candidate + switch { + case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): + // the old pod isn't available, so it needs to be replaced + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the replacement + if allowedReplacementPods == nil { + allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) + } + allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) + case numUnavailable >= maxUnavailable: + // no point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the candidate + if candidatePodsToDelete == nil { + candidatePodsToDelete = make([]string, 0, maxUnavailable) + } + candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) + } } - - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - numUnavailable++ } + + // use any of the candidates we can, including the allowedReplacemnntPods + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete)) + remainingUnavailable := maxUnavailable - numUnavailable + if remainingUnavailable < 0 { + remainingUnavailable = 0 + } + if max := len(candidatePodsToDelete); remainingUnavailable > max { + remainingUnavailable = max + } + oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) + return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) } @@ -100,7 +144,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v var numSurge int for nodeName, pods := range nodeToDaemonPods { - newPod, oldPod, ok := findSurgePodsOnNode(ds, pods, hash) + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) if !ok { // let the manage loop clean up this node, and treat it as a surge node klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) @@ -118,7 +162,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) // record the replacement if allowedNewNodes == nil { - allowedNewNodes = make([]string, 0, len(nodeList)) + allowedNewNodes = make([]string, 0, len(nodeToDaemonPods)) } allowedNewNodes = append(allowedNewNodes, nodeName) case numSurge >= maxSurge: @@ -159,11 +203,11 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) } -// findSurgePodsOnNode looks at non-deleted pods on a given node and returns true if there +// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there // is at most one of each old and new pods, or false if there are multiples. We can skip // processing the particular node in those scenarios and let the manage loop prune the // excess pods for our next time around. -func findSurgePodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { +func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { for _, pod := range podsOnNode { if pod.DeletionTimestamp != nil { continue @@ -471,71 +515,34 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* return history, err } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) { - var newPods []*v1.Pod - var oldPods []*v1.Pod - - for _, pods := range nodeToDaemonPods { - for _, pod := range pods { - // If the returned error is not nil we have a parse error. - // The controller handles this via the hash. - generation, err := util.GetTemplateGeneration(ds) - if err != nil { - generation = nil - } - if util.IsPodUpdated(pod, hash, generation) { - newPods = append(newPods, pod) - } else { - oldPods = append(oldPods, pod) - } - } - } - return newPods, oldPods -} - -// getUnavailableNumbers calculates the true number of allowed unavailable or surge pods. -// TODO: This method duplicates calculations in the main update loop and should be refactored -// to remove the need to calculate availability twice (once here, and once in the main loops) -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) { - klog.V(4).Infof("Getting unavailable numbers") - now := dsc.failedPodsBackoff.Clock.Now() - var numUnavailable, desiredNumberScheduled int +// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and +// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled. +func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { + var desiredNumberScheduled int for i := range nodeList { node := nodeList[i] wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { - return -1, -1, -1, err + return -1, -1, err } if !wantToRun { continue } desiredNumberScheduled++ - daemonPods, exists := nodeToDaemonPods[node.Name] - if !exists { - numUnavailable++ - continue - } - available := false - for _, pod := range daemonPods { - // for the purposes of update we ensure that the Pod is both available and not terminating - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) && pod.DeletionTimestamp == nil { - available = true - break - } - } - if !available { - numUnavailable++ + + if _, exists := nodeToDaemonPods[node.Name]; !exists { + nodeToDaemonPods[node.Name] = nil } } maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled) if err != nil { - return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) + return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) } maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled) if err != nil { - return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) + return -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) } // if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the @@ -544,8 +551,8 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name) maxUnavailable = 1 } - klog.V(0).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable, numUnavailable) - return maxSurge, maxUnavailable, numUnavailable, nil + klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable) + return maxSurge, maxUnavailable, nil } type historiesByRevision []*apps.ControllerRevision diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index d6bafba8760..5379f667d57 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -345,10 +345,7 @@ func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType intStr := intstr.FromInt(maxUnavailable) ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} - err = manager.dsStore.Update(ds) - if err != nil { - t.Fatal(err) - } + manager.dsStore.Update(ds) // template is not changed no pod should be removed clearExpectations(t, manager, ds, podControl) @@ -385,7 +382,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge bool maxSurge int maxUnavailable int - numUnavailable int + emptyNodes int Err error }{ { @@ -404,7 +401,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), nodeToPods: make(map[string][]*v1.Pod), maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with ready pods", @@ -432,7 +429,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes, one node without pods", @@ -457,7 +454,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 1, + emptyNodes: 1, }, { name: "Two nodes, one node without pods, surge", @@ -482,7 +479,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 1, + emptyNodes: 1, }, { name: "Two nodes with pods, MaxUnavailable in percents", @@ -510,7 +507,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable in percents, surge", @@ -540,7 +537,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge: true, maxSurge: 1, maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable is 100%, surge", @@ -570,7 +567,7 @@ func TestGetUnavailableNumbers(t *testing.T) { enableSurge: true, maxSurge: 2, maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", @@ -579,7 +576,7 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("error creating DaemonSets controller: %v", err) } - addNodes(manager.nodeStore, 0, 2, nil) + addNodes(manager.nodeStore, 0, 3, nil) return manager }(), ds: func() *apps.DaemonSet { @@ -599,8 +596,8 @@ func TestGetUnavailableNumbers(t *testing.T) { mapping["node-1"] = []*v1.Pod{pod1} return mapping }(), - maxUnavailable: 1, - numUnavailable: 1, + maxUnavailable: 2, + emptyNodes: 1, }, } @@ -613,7 +610,7 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("error listing nodes: %v", err) } - maxSurge, maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods) + maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods) if err != nil && c.Err != nil { if c.Err != err { t.Fatalf("Expected error: %v but got: %v", c.Err, err) @@ -622,8 +619,17 @@ func TestGetUnavailableNumbers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable { - t.Fatalf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable) + if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable { + t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable) + } + var emptyNodes int + for _, pods := range c.nodeToPods { + if len(pods) == 0 { + emptyNodes++ + } + } + if emptyNodes != c.emptyNodes { + t.Errorf("expected numEmpty to be %d, was %d", c.emptyNodes, emptyNodes) } }) } diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index f6060406975..f2b58237c20 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -19,7 +19,6 @@ package util import ( "fmt" "strconv" - "time" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -27,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" intstrutil "k8s.io/apimachinery/pkg/util/intstr" utilfeature "k8s.io/apiserver/pkg/util/feature" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" ) @@ -172,20 +170,6 @@ func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { return hashMatches || templateMatches } -// SplitByAvailablePods splits provided daemon set pods by availability. -func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod, now time.Time) ([]*v1.Pod, []*v1.Pod) { - availablePods := make([]*v1.Pod, 0, len(pods)) - var unavailablePods []*v1.Pod - for _, pod := range pods { - if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) { - availablePods = append(availablePods, pod) - } else { - unavailablePods = append(unavailablePods, pod) - } - } - return availablePods, unavailablePods -} - // ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution // NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName. // Note that this function assumes that no NodeAffinity conflicts with the selected nodeName.