change rolling update logic to exclude sunsetting nodes

This commit is contained in:
mochizuki875 2023-08-09 12:27:18 +09:00
parent 6805632abb
commit 2a82776745
3 changed files with 341 additions and 38 deletions

View File

@ -47,7 +47,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
if err != nil { if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} }
maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods) maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)
if err != nil { if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err) return fmt.Errorf("couldn't get unavailable numbers: %v", err)
} }
@ -140,10 +140,12 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
// * An old available pod is deleted if a new pod is available // * An old available pod is deleted if a new pod is available
// * No more than maxSurge new pods are created for old available pods at any one time // * No more than maxSurge new pods are created for old available pods at any one time
// //
var oldPodsToDelete []string var oldPodsToDelete []string // these pods are already updated or unavailable on sunsetted node
var shouldNotRunPodsToDelete []string // candidate pods to be deleted on sunsetted nodes
var candidateNewNodes []string var candidateNewNodes []string
var allowedNewNodes []string var allowedNewNodes []string
var numSurge int var numSurge int
var numAvailable int
for nodeName, pods := range nodeToDaemonPods { for nodeName, pods := range nodeToDaemonPods {
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
@ -153,6 +155,18 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
numSurge++ numSurge++
continue continue
} }
// first count availability for all the nodes (even the ones that we are sunsetting due to scheduling constraints)
if oldPod != nil {
if podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
} else if newPod != nil {
if podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numAvailable++
}
}
switch { switch {
case oldPod == nil: case oldPod == nil:
// we don't need to do anything to this node, the manage loop will handle it // we don't need to do anything to this node, the manage loop will handle it
@ -160,6 +174,15 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
// this is a surge candidate // this is a surge candidate
switch { switch {
case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}):
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
logger.V(5).Info("DaemonSet pod on node is not available and does not match scheduling constraints, remove old pod", "daemonset", klog.KObj(ds), "node", nodeName, "oldPod", klog.KObj(oldPod))
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
continue
}
// the old pod isn't available, allow it to become a replacement // the old pod isn't available, allow it to become a replacement
logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) logger.V(5).Info("Pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the replacement // record the replacement
@ -167,10 +190,19 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
allowedNewNodes = make([]string, 0, len(nodeToDaemonPods)) allowedNewNodes = make([]string, 0, len(nodeToDaemonPods))
} }
allowedNewNodes = append(allowedNewNodes, nodeName) allowedNewNodes = append(allowedNewNodes, nodeName)
case numSurge >= maxSurge:
// no point considering any other candidates
continue
default: default:
node, err := dsc.nodeLister.Get(nodeName)
if err != nil {
return fmt.Errorf("couldn't get node for nodeName %q: %v", nodeName, err)
}
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); !shouldRun {
shouldNotRunPodsToDelete = append(shouldNotRunPodsToDelete, oldPod.Name)
continue
}
if numSurge >= maxSurge {
// no point considering any other candidates
continue
}
logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) logger.V(5).Info("DaemonSet pod on node is out of date, this is a surge candidate", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName))
// record the candidate // record the candidate
if candidateNewNodes == nil { if candidateNewNodes == nil {
@ -194,6 +226,27 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
// use any of the candidates we can, including the allowedNewNodes // use any of the candidates we can, including the allowedNewNodes
logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes)) logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedNewNodes), "maxSurge", maxSurge, "numSurge", numSurge, "candidates", len(candidateNewNodes))
remainingSurge := maxSurge - numSurge remainingSurge := maxSurge - numSurge
// With maxSurge, the application owner expects 100% availability.
// When the scheduling constraint change from node A to node B, we do not want the application to stay
// without any available pods. Only delete a pod on node A when a pod on node B becomes available.
if deletablePodsNumber := numAvailable - desiredNumberScheduled; deletablePodsNumber > 0 {
if shouldNotRunPodsToDeleteNumber := len(shouldNotRunPodsToDelete); deletablePodsNumber > shouldNotRunPodsToDeleteNumber {
deletablePodsNumber = shouldNotRunPodsToDeleteNumber
}
for _, podToDeleteName := range shouldNotRunPodsToDelete[:deletablePodsNumber] {
podToDelete, err := dsc.podLister.Pods(ds.Namespace).Get(podToDeleteName)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return fmt.Errorf("couldn't get pod which should be deleted due to scheduling constraints %q: %v", podToDeleteName, err)
}
logger.V(5).Info("DaemonSet pod on node should be deleted due to scheduling constraints", "daemonset", klog.KObj(ds), "pod", klog.KObj(podToDelete), "node", podToDelete.Spec.NodeName)
oldPodsToDelete = append(oldPodsToDelete, podToDeleteName)
}
}
if remainingSurge < 0 { if remainingSurge < 0 {
remainingSurge = 0 remainingSurge = 0
} }
@ -525,9 +578,9 @@ func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSe
return history, err return history, err
} }
// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and // updatedDesiredNodeCounts calculates the true number of allowed surge, unavailable or desired scheduled pods and
// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled. // updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled.
func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, int, error) {
var desiredNumberScheduled int var desiredNumberScheduled int
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
for i := range nodeList { for i := range nodeList {
@ -545,12 +598,12 @@ func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, d
maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled) maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled)
if err != nil { if err != nil {
return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) return -1, -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err)
} }
maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled) maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled)
if err != nil { if err != nil {
return -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) return -1, -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err)
} }
// if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the // if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the
@ -560,7 +613,7 @@ func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ctx context.Context, d
maxUnavailable = 1 maxUnavailable = 1
} }
logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable) logger.V(5).Info("DaemonSet with maxSurge and maxUnavailable", "daemonset", klog.KObj(ds), "maxSurge", maxSurge, "maxUnavailable", maxUnavailable)
return maxSurge, maxUnavailable, nil return maxSurge, maxUnavailable, desiredNumberScheduled, nil
} }
type historiesByRevision []*apps.ControllerRevision type historiesByRevision []*apps.ControllerRevision

View File

@ -117,7 +117,79 @@ func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) {
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
} }
func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { func TestDaemonSetUpdatesPodsNotMatchTainstWithMaxSurge(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo")
maxSurge := 1
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(maxSurge))
tolerations := []v1.Toleration{
{Key: "node-role.kubernetes.io/control-plane", Operator: v1.TolerationOpExists},
}
setDaemonSetToleration(ds, tolerations)
manager, podControl, _, err := newTestController(ctx, ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
err = manager.dsStore.Add(ds)
if err != nil {
t.Fatal(err)
}
// Add five nodes and taint to one node
addNodes(manager.nodeStore, 0, 5, nil)
taints := []v1.Taint{
{Key: "node-role.kubernetes.io/control-plane", Effect: v1.TaintEffectNoSchedule},
}
node := newNode("node-0", nil)
setNodeTaint(node, taints)
err = manager.nodeStore.Update(node)
if err != nil {
t.Fatal(err)
}
// Create DaemonSet with toleration
expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0)
markPodsReady(podControl.podStore)
// RollingUpdate DaemonSet without toleration
ds.Spec.Template.Spec.Tolerations = nil
err = manager.dsStore.Update(ds)
if err != nil {
t.Fatal(err)
}
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, 1, 0)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
markPodsReady(podControl.podStore)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, maxSurge, 0)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
markPodsReady(podControl.podStore)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, maxSurge, 0)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
markPodsReady(podControl.podStore)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, maxSurge, 0)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
markPodsReady(podControl.podStore)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, maxSurge, 0)
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0)
}
func TestDaemonSetUpdatesWhenNewPodIsNotReady(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ds := newDaemonSet("foo") ds := newDaemonSet("foo")
manager, podControl, _, err := newTestController(ctx, ds) manager, podControl, _, err := newTestController(ctx, ds)
@ -379,14 +451,15 @@ func newUpdateUnavailable(value intstr.IntOrString) apps.DaemonSetUpdateStrategy
func TestGetUnavailableNumbers(t *testing.T) { func TestGetUnavailableNumbers(t *testing.T) {
cases := []struct { cases := []struct {
name string name string
ManagerFunc func(ctx context.Context) *daemonSetsController ManagerFunc func(ctx context.Context) *daemonSetsController
ds *apps.DaemonSet ds *apps.DaemonSet
nodeToPods map[string][]*v1.Pod nodeToPods map[string][]*v1.Pod
maxSurge int maxSurge int
maxUnavailable int maxUnavailable int
emptyNodes int desiredNumberScheduled int
Err error emptyNodes int
Err error
}{ }{
{ {
name: "No nodes", name: "No nodes",
@ -431,8 +504,9 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1} mapping["node-1"] = []*v1.Pod{pod1}
return mapping return mapping
}(), }(),
maxUnavailable: 1, maxUnavailable: 1,
emptyNodes: 0, desiredNumberScheduled: 2,
emptyNodes: 0,
}, },
{ {
name: "Two nodes, one node without pods", name: "Two nodes, one node without pods",
@ -456,8 +530,9 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-0"] = []*v1.Pod{pod0} mapping["node-0"] = []*v1.Pod{pod0}
return mapping return mapping
}(), }(),
maxUnavailable: 1, maxUnavailable: 1,
emptyNodes: 1, desiredNumberScheduled: 2,
emptyNodes: 1,
}, },
{ {
name: "Two nodes, one node without pods, surge", name: "Two nodes, one node without pods, surge",
@ -481,8 +556,9 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-0"] = []*v1.Pod{pod0} mapping["node-0"] = []*v1.Pod{pod0}
return mapping return mapping
}(), }(),
maxUnavailable: 1, maxUnavailable: 1,
emptyNodes: 1, desiredNumberScheduled: 2,
emptyNodes: 1,
}, },
{ {
name: "Two nodes with pods, MaxUnavailable in percents", name: "Two nodes with pods, MaxUnavailable in percents",
@ -509,8 +585,9 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1} mapping["node-1"] = []*v1.Pod{pod1}
return mapping return mapping
}(), }(),
maxUnavailable: 1, maxUnavailable: 1,
emptyNodes: 0, desiredNumberScheduled: 2,
emptyNodes: 0,
}, },
{ {
name: "Two nodes with pods, MaxUnavailable in percents, surge", name: "Two nodes with pods, MaxUnavailable in percents, surge",
@ -537,9 +614,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1} mapping["node-1"] = []*v1.Pod{pod1}
return mapping return mapping
}(), }(),
maxSurge: 1, maxSurge: 1,
maxUnavailable: 0, maxUnavailable: 0,
emptyNodes: 0, desiredNumberScheduled: 2,
emptyNodes: 0,
}, },
{ {
name: "Two nodes with pods, MaxUnavailable is 100%, surge", name: "Two nodes with pods, MaxUnavailable is 100%, surge",
@ -566,9 +644,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1} mapping["node-1"] = []*v1.Pod{pod1}
return mapping return mapping
}(), }(),
maxSurge: 2, maxSurge: 2,
maxUnavailable: 0, maxUnavailable: 0,
emptyNodes: 0, desiredNumberScheduled: 2,
emptyNodes: 0,
}, },
{ {
name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", name: "Two nodes with pods, MaxUnavailable in percents, pod terminating",
@ -597,8 +676,9 @@ func TestGetUnavailableNumbers(t *testing.T) {
mapping["node-1"] = []*v1.Pod{pod1} mapping["node-1"] = []*v1.Pod{pod1}
return mapping return mapping
}(), }(),
maxUnavailable: 2, maxUnavailable: 2,
emptyNodes: 1, desiredNumberScheduled: 3,
emptyNodes: 1,
}, },
} }
@ -611,7 +691,7 @@ func TestGetUnavailableNumbers(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("error listing nodes: %v", err) t.Fatalf("error listing nodes: %v", err)
} }
maxSurge, maxUnavailable, err := manager.updatedDesiredNodeCounts(ctx, c.ds, nodeList, c.nodeToPods) maxSurge, maxUnavailable, desiredNumberScheduled, err := manager.updatedDesiredNodeCounts(ctx, c.ds, nodeList, c.nodeToPods)
if err != nil && c.Err != nil { if err != nil && c.Err != nil {
if c.Err != err { if c.Err != err {
t.Fatalf("Expected error: %v but got: %v", c.Err, err) t.Fatalf("Expected error: %v but got: %v", c.Err, err)
@ -620,8 +700,8 @@ func TestGetUnavailableNumbers(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable { if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable || desiredNumberScheduled != c.desiredNumberScheduled {
t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable) t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d, desiredNumberScheduled: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable, desiredNumberScheduled, c.desiredNumberScheduled)
} }
var emptyNodes int var emptyNodes int
for _, pods := range c.nodeToPods { for _, pods := range c.nodeToPods {

View File

@ -19,9 +19,11 @@ package daemonset
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -356,6 +358,39 @@ func validateDaemonSetPodsActive(
} }
} }
func validateDaemonSetPodsTolerations(
podClient corev1client.PodInterface,
podInformer cache.SharedIndexInformer,
expectedTolerations []v1.Toleration,
prefix string,
t *testing.T,
) {
objects := podInformer.GetIndexer().List()
for _, object := range objects {
var prefixedPodToleration []v1.Toleration
pod := object.(*v1.Pod)
ownerReferences := pod.ObjectMeta.OwnerReferences
if len(ownerReferences) != 1 {
t.Errorf("Pod %s has %d OwnerReferences, expected only 1", pod.Name, len(ownerReferences))
}
controllerRef := ownerReferences[0]
if got, want := controllerRef.Kind, "DaemonSet"; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("controllerRef.Controller is not set to true")
}
for _, podToleration := range pod.Spec.Tolerations {
if strings.HasPrefix(podToleration.Key, prefix) {
prefixedPodToleration = append(prefixedPodToleration, podToleration)
}
}
if diff := cmp.Diff(expectedTolerations, prefixedPodToleration); diff != "" {
t.Fatalf("Unexpected tolerations (-want,+got):\n%s", diff)
}
}
}
// podUnschedulable returns a condition function that returns true if the given pod // podUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status. // gets unschedulable status.
func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
@ -456,6 +491,23 @@ func validateDaemonSetStatus(
} }
} }
func validateUpdatedNumberScheduled(
ctx context.Context,
dsClient appstyped.DaemonSetInterface,
dsName string,
expectedUpdatedNumberScheduled int32,
t *testing.T) {
if err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
ds, err := dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
if err != nil {
return false, err
}
return ds.Status.UpdatedNumberScheduled == expectedUpdatedNumberScheduled, nil
}); err != nil {
t.Fatal(err)
}
}
func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet { func updateDS(t *testing.T, dsClient appstyped.DaemonSetInterface, dsName string, updateFunc func(*apps.DaemonSet)) *apps.DaemonSet {
var ds *apps.DaemonSet var ds *apps.DaemonSet
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
@ -1159,3 +1211,121 @@ func TestUpdateStatusDespitePodCreationFailure(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, int32(limitedPodNumber), t) validateDaemonSetStatus(dsClient, ds.Name, int32(limitedPodNumber), t)
}) })
} }
func TestDaemonSetRollingUpdateWithTolerations(t *testing.T) {
var taints []v1.Taint
var node *v1.Node
var tolerations []v1.Toleration
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "daemonset-rollingupdate-with-tolerations-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podClient := clientset.CoreV1().Pods(ns.Name)
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
zero := intstr.FromInt32(0)
maxSurge := intstr.FromInt32(1)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &zero,
MaxSurge: &maxSurge,
},
}
// Add six nodes with zone-y, zone-z or common taint
for i := 0; i < 6; i++ {
if i < 2 {
taints = []v1.Taint{
{Key: "zone-y", Effect: v1.TaintEffectNoSchedule},
}
} else if i < 4 {
taints = []v1.Taint{
{Key: "zone-z", Effect: v1.TaintEffectNoSchedule},
}
} else {
taints = []v1.Taint{
{Key: "zone-common", Effect: v1.TaintEffectNoSchedule},
}
}
node = newNode(fmt.Sprintf("node-%d", i), nil)
node.Spec.Taints = taints
_, err := nodeClient.Create(context.TODO(), node, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
}
// Create DaemonSet with zone-y toleration
tolerations = []v1.Toleration{
{Key: "zone-y", Operator: v1.TolerationOpExists},
{Key: "zone-common", Operator: v1.TolerationOpExists},
}
ds.Spec.Template.Spec.Tolerations = tolerations
_, err := dsClient.Create(ctx, ds, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, clientset, ds)
validateDaemonSetPodsActive(podClient, podInformer, 4, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 4, t)
validateDaemonSetStatus(dsClient, ds.Name, 4, t)
validateUpdatedNumberScheduled(ctx, dsClient, ds.Name, 4, t)
validateDaemonSetPodsTolerations(podClient, podInformer, tolerations, "zone-", t)
// Update DaemonSet with zone-z toleration
tolerations = []v1.Toleration{
{Key: "zone-z", Operator: v1.TolerationOpExists},
{Key: "zone-common", Operator: v1.TolerationOpExists},
}
ds.Spec.Template.Spec.Tolerations = tolerations
_, err = dsClient.Update(ctx, ds, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update DaemonSet: %v", err)
}
// Expected numberPods of validateDaemonSetPodsActive is 7 when update DaemonSet
// and before updated pods become ready because:
// - New 2 pods are created and Pending on Zone Z nodes
// - New 1 pod are created as surge and Pending on Zone Common node
// - Old 2 pods that violate scheduling constraints on Zone Y nodes will remain existing and Running
// until other new pods become available
validateDaemonSetPodsActive(podClient, podInformer, 7, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 4, t)
validateDaemonSetStatus(dsClient, ds.Name, 4, t)
validateUpdatedNumberScheduled(ctx, dsClient, ds.Name, 4, t)
validateDaemonSetPodsTolerations(podClient, podInformer, tolerations, "zone-", t)
// Update DaemonSet with zone-y and zone-z toleration
tolerations = []v1.Toleration{
{Key: "zone-y", Operator: v1.TolerationOpExists},
{Key: "zone-z", Operator: v1.TolerationOpExists},
{Key: "zone-common", Operator: v1.TolerationOpExists},
}
ds.Spec.Template.Spec.Tolerations = tolerations
_, err = dsClient.Update(ctx, ds, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update DaemonSet: %v", err)
}
validateDaemonSetPodsActive(podClient, podInformer, 7, t)
validateDaemonSetPodsAndMarkReady(podClient, podInformer, 6, t)
validateDaemonSetStatus(dsClient, ds.Name, 6, t)
validateUpdatedNumberScheduled(ctx, dsClient, ds.Name, 6, t)
validateDaemonSetPodsTolerations(podClient, podInformer, tolerations, "zone-", t)
// Update DaemonSet with no toleration
ds.Spec.Template.Spec.Tolerations = nil
_, err = dsClient.Update(ctx, ds, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update DaemonSet: %v", err)
}
validateDaemonSetPodsActive(podClient, podInformer, 0, t)
validateDaemonSetStatus(dsClient, ds.Name, 0, t)
validateUpdatedNumberScheduled(ctx, dsClient, ds.Name, 0, t)
}