daemonset: Simplify the logic for calculating unavailable pods

In order to maintain the correct invariants, the existing maxUnavailable
logic calculated the same data several times in different ways. Leverage
the simpler structure from maxSurge and calculate pod availability only
once, as well as perform only a single pass over all the pods in the
daemonset. This changed no behavior of the current controller, and
has a structure that is almost identical to maxSurge.
This commit is contained in:
Clayton Coleman 2021-01-27 01:09:53 -05:00
parent 18f43e4120
commit 9f296c133d
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
3 changed files with 106 additions and 109 deletions

View File

@ -38,14 +38,14 @@ import (
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
// rollingUpdate deletes old daemon set pods making sure that no more than
// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
maxSurge, maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
@ -54,30 +54,74 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v
// When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any
// are necessary, and let the core loop create new instances on those nodes.
//
// Assumptions:
// * Expect manage loop to allow no more than one pod per node
// * Expect manage loop will create new pods
// * Expect manage loop will handle failed pods
// * Deleted pods do not count as unavailable so that updates make progress when nodes are down
// Invariants:
// * The number of new pods that are unavailable must be less than maxUnavailable
// * A node with an available old pod is a candidate for deletion if it does not violate other invariants
//
if maxSurge == 0 {
_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods, now)
var oldPodsToDelete []string
klog.V(4).Infof("Marking all unavailable old pods for deletion")
for _, pod := range oldUnavailablePods {
// Skip terminating pods. We won't delete them again
if pod.DeletionTimestamp != nil {
var numUnavailable int
var allowedReplacementPods []string
var candidatePodsToDelete []string
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as an unavailable node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
numUnavailable++
continue
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
}
for _, pod := range oldAvailablePods {
if numUnavailable >= maxUnavailable {
klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
break
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// the manage loop will handle creating or deleting the appropriate pod, consider this unavailable
numUnavailable++
case newPod != nil:
// this pod is up to date, check its availability
if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
// an unavailable new pod is counted against maxUnavailable
numUnavailable++
}
default:
// this pod is old, it is an update candidate
switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
// the old pod isn't available, so it needs to be replaced
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedReplacementPods == nil {
allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods))
}
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// no point considering any other candidates
continue
default:
klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the candidate
if candidatePodsToDelete == nil {
candidatePodsToDelete = make([]string, 0, maxUnavailable)
}
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
numUnavailable++
}
// use any of the candidates we can, including the allowedReplacemnntPods
klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete))
remainingUnavailable := maxUnavailable - numUnavailable
if remainingUnavailable < 0 {
remainingUnavailable = 0
}
if max := len(candidatePodsToDelete); remainingUnavailable > max {
remainingUnavailable = max
}
oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
return dsc.syncNodes(ds, oldPodsToDelete, nil, hash)
}
@ -100,7 +144,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v
var numSurge int
for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findSurgePodsOnNode(ds, pods, hash)
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
if !ok {
// let the manage loop clean up this node, and treat it as a surge node
klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName)
@ -118,7 +162,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v
klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName)
// record the replacement
if allowedNewNodes == nil {
allowedNewNodes = make([]string, 0, len(nodeList))
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
}
allowedNewNodes = append(allowedNewNodes, nodeName)
case numSurge >= maxSurge:
@ -159,11 +203,11 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v
return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash)
}
// findSurgePodsOnNode looks at non-deleted pods on a given node and returns true if there
// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there
// is at most one of each old and new pods, or false if there are multiples. We can skip
// processing the particular node in those scenarios and let the manage loop prune the
// excess pods for our next time around.
func findSurgePodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) {
func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) {
for _, pod := range podsOnNode {
if pod.DeletionTimestamp != nil {
continue
@ -471,71 +515,34 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
return history, err
}
func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) {
var newPods []*v1.Pod
var oldPods []*v1.Pod
for _, pods := range nodeToDaemonPods {
for _, pod := range pods {
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
if util.IsPodUpdated(pod, hash, generation) {
newPods = append(newPods, pod)
} else {
oldPods = append(oldPods, pod)
}
}
}
return newPods, oldPods
}
// getUnavailableNumbers calculates the true number of allowed unavailable or surge pods.
// TODO: This method duplicates calculations in the main update loop and should be refactored
// to remove the need to calculate availability twice (once here, and once in the main loops)
func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) {
klog.V(4).Infof("Getting unavailable numbers")
now := dsc.failedPodsBackoff.Clock.Now()
var numUnavailable, desiredNumberScheduled int
// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and
// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled.
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) {
var desiredNumberScheduled int
for i := range nodeList {
node := nodeList[i]
wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return -1, -1, -1, err
return -1, -1, err
}
if !wantToRun {
continue
}
desiredNumberScheduled++
daemonPods, exists := nodeToDaemonPods[node.Name]
if !exists {
numUnavailable++
continue
}
available := false
for _, pod := range daemonPods {
// for the purposes of update we ensure that the Pod is both available and not terminating
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) && pod.DeletionTimestamp == nil {
available = true
break
}
}
if !available {
numUnavailable++
if _, exists := nodeToDaemonPods[node.Name]; !exists {
nodeToDaemonPods[node.Name] = nil
}
}
maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled)
if err != nil {
return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
}
maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled)
if err != nil {
return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err)
return -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err)
}
// if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the
@ -544,8 +551,8 @@ func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeL
klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name)
maxUnavailable = 1
}
klog.V(0).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable, numUnavailable)
return maxSurge, maxUnavailable, numUnavailable, nil
klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable)
return maxSurge, maxUnavailable, nil
}
type historiesByRevision []*apps.ControllerRevision

View File

@ -345,10 +345,7 @@ func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) {
ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType
intStr := intstr.FromInt(maxUnavailable)
ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr}
err = manager.dsStore.Update(ds)
if err != nil {
t.Fatal(err)
}
manager.dsStore.Update(ds)
// template is not changed no pod should be removed
clearExpectations(t, manager, ds, podControl)
@ -385,7 +382,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
enableSurge bool
maxSurge int
maxUnavailable int
numUnavailable int
emptyNodes int
Err error
}{
{
@ -404,7 +401,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
}(),
nodeToPods: make(map[string][]*v1.Pod),
maxUnavailable: 0,
numUnavailable: 0,
emptyNodes: 0,
},
{
name: "Two nodes with ready pods",
@ -432,7 +429,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
return mapping
}(),
maxUnavailable: 1,
numUnavailable: 0,
emptyNodes: 0,
},
{
name: "Two nodes, one node without pods",
@ -457,7 +454,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
return mapping
}(),
maxUnavailable: 1,
numUnavailable: 1,
emptyNodes: 1,
},
{
name: "Two nodes, one node without pods, surge",
@ -482,7 +479,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
return mapping
}(),
maxUnavailable: 1,
numUnavailable: 1,
emptyNodes: 1,
},
{
name: "Two nodes with pods, MaxUnavailable in percents",
@ -510,7 +507,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
return mapping
}(),
maxUnavailable: 1,
numUnavailable: 0,
emptyNodes: 0,
},
{
name: "Two nodes with pods, MaxUnavailable in percents, surge",
@ -540,7 +537,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
enableSurge: true,
maxSurge: 1,
maxUnavailable: 0,
numUnavailable: 0,
emptyNodes: 0,
},
{
name: "Two nodes with pods, MaxUnavailable is 100%, surge",
@ -570,7 +567,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
enableSurge: true,
maxSurge: 2,
maxUnavailable: 0,
numUnavailable: 0,
emptyNodes: 0,
},
{
name: "Two nodes with pods, MaxUnavailable in percents, pod terminating",
@ -579,7 +576,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
addNodes(manager.nodeStore, 0, 3, nil)
return manager
}(),
ds: func() *apps.DaemonSet {
@ -599,8 +596,8 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1}
return mapping
}(),
maxUnavailable: 1,
numUnavailable: 1,
maxUnavailable: 2,
emptyNodes: 1,
},
}
@ -613,7 +610,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
if err != nil {
t.Fatalf("error listing nodes: %v", err)
}
maxSurge, maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods)
maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods)
if err != nil && c.Err != nil {
if c.Err != err {
t.Fatalf("Expected error: %v but got: %v", c.Err, err)
@ -622,8 +619,17 @@ func TestGetUnavailableNumbers(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable {
t.Fatalf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable)
if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable {
t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable)
}
var emptyNodes int
for _, pods := range c.nodeToPods {
if len(pods) == 0 {
emptyNodes++
}
}
if emptyNodes != c.emptyNodes {
t.Errorf("expected numEmpty to be %d, was %d", c.emptyNodes, emptyNodes)
}
})
}

View File

@ -19,7 +19,6 @@ package util
import (
"fmt"
"strconv"
"time"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@ -27,7 +26,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
)
@ -172,20 +170,6 @@ func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool {
return hashMatches || templateMatches
}
// SplitByAvailablePods splits provided daemon set pods by availability.
func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod, now time.Time) ([]*v1.Pod, []*v1.Pod) {
availablePods := make([]*v1.Pod, 0, len(pods))
var unavailablePods []*v1.Pod
for _, pod := range pods {
if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) {
availablePods = append(availablePods, pod)
} else {
unavailablePods = append(unavailablePods, pod)
}
}
return availablePods, unavailablePods
}
// ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution
// NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName.
// Note that this function assumes that no NodeAffinity conflicts with the selected nodeName.