kubernetes/test/e2e/apps/daemon_set.go
Patrick Ohly 41f23f52d0 test: fix ginkgolinter issues
All of these issues were reported by https://github.com/nunnatsa/ginkgolinter.
Fixing these issues is useful (several expressions get simpler, using
framework.ExpectNoError is better because it has additional support for
failures) and a necessary step for enabling that linter in our golangci-lint
invocation.
2023-02-22 19:36:05 +01:00

1258 lines
53 KiB
Go

/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apps
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"sort"
"strings"
"text/tabwriter"
"time"
"k8s.io/client-go/tools/cache"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
watch "k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/test/e2e/framework"
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
admissionapi "k8s.io/pod-security-admission/api"
)
const (
// this should not be a multiple of 5, because node status updates
// every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915.
dsRetryPeriod = 1 * time.Second
dsRetryTimeout = 5 * time.Minute
daemonsetLabelPrefix = "daemonset-"
daemonsetNameLabel = daemonsetLabelPrefix + "name"
daemonsetColorLabel = daemonsetLabelPrefix + "color"
)
// NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning
// node selectors labels to namespaces
var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
type updateDSFunc func(*appsv1.DaemonSet)
// updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func
// until it succeeds or a timeout expires.
func updateDaemonSetWithRetries(ctx context.Context, c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) {
daemonsets := c.AppsV1().DaemonSets(namespace)
var updateErr error
pollErr := wait.PollImmediateWithContext(ctx, 10*time.Millisecond, 1*time.Minute, func(ctx context.Context) (bool, error) {
if ds, err = daemonsets.Get(ctx, name, metav1.GetOptions{}); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(ds)
if ds, err = daemonsets.Update(ctx, ds, metav1.UpdateOptions{}); err == nil {
framework.Logf("Updating DaemonSet %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr)
}
return ds, pollErr
}
// This test must be run in serial because it assumes the Daemon Set pods will
// always get scheduled. If we run other tests in parallel, this may not
// happen. In the future, running in parallel may work if we have an eviction
// model which lets the DS controller kick out other pods to make room.
// See https://issues.k8s.io/21767 for more details
var _ = SIGDescribe("Daemon set [Serial]", func() {
var f *framework.Framework
ginkgo.AfterEach(func(ctx context.Context) {
// Clean up
daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "unable to dump DaemonSets")
if daemonsets != nil && len(daemonsets.Items) > 0 {
for _, ds := range daemonsets.Items {
ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
}
}
if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
} else {
framework.Logf("unable to dump daemonsets: %v", err)
}
if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
} else {
framework.Logf("unable to dump pods: %v", err)
}
err = clearDaemonSetNodeLabels(ctx, f.ClientSet)
framework.ExpectNoError(err)
})
f = framework.NewDefaultFramework("daemonsets")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
image := WebserverImage
dsName := "daemon-set"
var ns string
var c clientset.Interface
ginkgo.BeforeEach(func(ctx context.Context) {
ns = f.Namespace.Name
c = f.ClientSet
updatedNS, err := patchNamespaceAnnotations(ctx, c, ns)
framework.ExpectNoError(err)
ns = updatedNS.Name
err = clearDaemonSetNodeLabels(ctx, c)
framework.ExpectNoError(err)
})
/*
Release: v1.10
Testname: DaemonSet-Creation
Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet
Pod is deleted, the DaemonSet controller MUST create a replacement Pod.
*/
framework.ConformanceIt("should run and stop simple daemon", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
podList := listDaemonPods(ctx, c, ns, label)
pod := podList.Items[0]
err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to revive")
})
/*
Release: v1.10
Testname: DaemonSet-NodeSelection
Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label
selectors.
*/
framework.ConformanceIt("should run and stop complex daemon", func(ctx context.Context) {
complexLabel := map[string]string{daemonsetNameLabel: dsName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
framework.Logf("Creating daemon %q with a node selector", dsName)
ds := newDaemonSet(dsName, image, complexLabel)
ds.Spec.Template.Spec.NodeSelector = nodeSelector
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Initially, daemon pods should not be running on any nodes.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
ginkgo.By("Change node label to blue, check that daemon pod is launched.")
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err)
newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
framework.ExpectNoError(err, "error setting labels on node")
daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
framework.ExpectEqual(len(daemonSetLabels), 1)
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
nodeSelector[daemonsetColorLabel] = "green"
greenNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
framework.ExpectNoError(err, "error removing labels on node")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err, "error patching daemon set")
daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
framework.ExpectEqual(len(daemonSetLabels), 1)
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
})
// We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the
// default scheduler.
ginkgo.It("should run and stop complex daemon with node affinity", func(ctx context.Context) {
complexLabel := map[string]string{daemonsetNameLabel: dsName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
framework.Logf("Creating daemon %q with a node affinity", dsName)
ds := newDaemonSet(dsName, image, complexLabel)
ds.Spec.Template.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: daemonsetColorLabel,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeSelector[daemonsetColorLabel]},
},
},
},
},
},
},
}
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Initially, daemon pods should not be running on any nodes.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
ginkgo.By("Change node label to blue, check that daemon pod is launched.")
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err)
newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
framework.ExpectNoError(err, "error setting labels on node")
daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
framework.ExpectEqual(len(daemonSetLabels), 1)
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
_, err = setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
framework.ExpectNoError(err, "error removing labels on node")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
})
/*
Release: v1.10
Testname: DaemonSet-FailedPodCreation
Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail.
*/
framework.ConformanceIt("should retry creating failed daemon pods", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
podList := listDaemonPods(ctx, c, ns, label)
pod := podList.Items[0]
pod.ResourceVersion = ""
pod.Status.Phase = v1.PodFailed
_, err = c.CoreV1().Pods(ns).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
framework.ExpectNoError(err, "error failing a daemon pod")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to revive")
ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, waitFailedDaemonPodDeleted(c, &pod))
framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
})
// This test should not be added to conformance. We will consider deprecating OnDelete when the
// extensions/v1beta1 and apps/v1beta1 are removed.
ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
framework.Logf("Creating simple daemon set %s", dsName)
ds := newDaemonSet(dsName, image, label)
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 1)
first := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(first.Revision, int64(1))
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
ginkgo.By("Update daemon pods image.")
patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods images aren't updated.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 2)
cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
framework.ExpectEqual(cur.Revision, int64(2))
framework.ExpectNotEqual(cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey], firstHash)
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
})
/*
Release: v1.10
Testname: DaemonSet-RollingUpdate
Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates.
*/
framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
framework.Logf("Creating simple daemon set %s", dsName)
ds := newDaemonSet(dsName, image, label)
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 1)
cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(1))
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
ginkgo.By("Update daemon pods image.")
patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
// Get the number of nodes, and set the timeout appropriately.
nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
nodeCount := len(nodes.Items)
retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
ginkgo.By("Check that daemon pods images are updated.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, retryTimeout, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1))
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 2)
cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(2))
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
})
/*
Release: v1.10
Testname: DaemonSet-Rollback
Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive
rollback of updates to a DaemonSet.
*/
framework.ConformanceIt("should rollback without unnecessary restarts", func(ctx context.Context) {
schedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
framework.ExpectNoError(err)
gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
framework.Logf("Create a RollingUpdate DaemonSet")
label := map[string]string{daemonsetNameLabel: dsName}
ds := newDaemonSet(dsName, image, label)
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
ds, err = c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
framework.Logf("Check that daemon pods launch on every node of the cluster")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
framework.Logf("Update the DaemonSet to trigger a rollout")
// We use a nonexistent image here, so that we make sure it won't finish
newImage := "foo:non-existent"
newDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
update.Spec.Template.Spec.Containers[0].Image = newImage
})
framework.ExpectNoError(err)
// Make sure we're in the middle of a rollout
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkAtLeastOneNewPod(c, ns, label, newImage))
framework.ExpectNoError(err)
pods := listDaemonPods(ctx, c, ns, label)
var existingPods, newPods []*v1.Pod
for i := range pods.Items {
pod := pods.Items[i]
image := pod.Spec.Containers[0].Image
switch image {
case ds.Spec.Template.Spec.Containers[0].Image:
existingPods = append(existingPods, &pod)
case newDS.Spec.Template.Spec.Containers[0].Image:
newPods = append(newPods, &pod)
default:
framework.Failf("unexpected pod found, image = %s", image)
}
}
schedulableNodes, err = e2enode.GetReadySchedulableNodes(ctx, c)
framework.ExpectNoError(err)
if len(schedulableNodes.Items) < 2 {
framework.ExpectEqual(len(existingPods), 0)
} else {
framework.ExpectNotEqual(len(existingPods), 0)
}
framework.ExpectNotEqual(len(newPods), 0)
framework.Logf("Roll back the DaemonSet before rollout is complete")
rollbackDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
update.Spec.Template.Spec.Containers[0].Image = image
})
framework.ExpectNoError(err)
framework.Logf("Make sure DaemonSet rollback is complete")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
framework.ExpectNoError(err)
// After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted
pods = listDaemonPods(ctx, c, ns, label)
rollbackPods := map[string]bool{}
for _, pod := range pods.Items {
rollbackPods[pod.Name] = true
}
for _, pod := range existingPods {
if !rollbackPods[pod.Name] {
framework.Failf("unexpected pod %s be restarted", pod.Name)
}
}
})
// TODO: This test is expected to be promoted to conformance after the feature is promoted
ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
framework.Logf("Creating surge daemon set %s", dsName)
maxSurgeOverlap := 60 * time.Second
maxSurge := 1
surgePercent := intstr.FromString("20%")
zero := intstr.FromInt(0)
oldVersion := "1"
ds := newDaemonSet(dsName, image, label)
ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{
{Name: "VERSION", Value: oldVersion},
}
// delay shutdown by 15s to allow containers to overlap in time
ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{
PreStop: &v1.LifecycleHandler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sh", "-c", "sleep 15"},
},
},
}
// use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready)
ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`},
},
},
InitialDelaySeconds: 7,
PeriodSeconds: 3,
SuccessThreshold: 1,
FailureThreshold: 1,
}
// use a simple surge strategy
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
Type: appsv1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateDaemonSet{
MaxUnavailable: &zero,
MaxSurge: &surgePercent,
},
}
// The pod must be ready for at least 10s before we delete the old pod
ds.Spec.MinReadySeconds = 10
ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 1)
cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(1))
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
newVersion := "2"
ginkgo.By("Update daemon pods environment var")
patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion)
ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
// Get the number of nodes, and set the timeout appropriately.
nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
nodeCount := len(nodes.Items)
retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout")
ageOfOldPod := make(map[string]time.Time)
deliberatelyDeletedPods := sets.NewString()
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, retryTimeout, func(ctx context.Context) (bool, error) {
podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
pods := podList.Items
var buf bytes.Buffer
pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0)
fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n")
now := time.Now()
podUIDs := sets.NewString()
deletedPodUIDs := sets.NewString()
nodes := sets.NewString()
versions := sets.NewString()
nodesToVersions := make(map[string]map[string]int)
nodesToDeletedVersions := make(map[string]map[string]int)
var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
nodeName := pod.Spec.NodeName
nodes.Insert(nodeName)
podVersion := pod.Spec.Containers[0].Env[0].Value
if pod.DeletionTimestamp != nil {
if !deliberatelyDeletedPods.Has(string(pod.UID)) {
versions := nodesToDeletedVersions[nodeName]
if versions == nil {
versions = make(map[string]int)
nodesToDeletedVersions[nodeName] = versions
}
versions[podVersion]++
}
} else {
versions := nodesToVersions[nodeName]
if versions == nil {
versions = make(map[string]int)
nodesToVersions[nodeName] = versions
}
versions[podVersion]++
}
ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
if podVersion == newVersion {
surgeCount++
if !ready || pod.DeletionTimestamp != nil {
if deliberatelyDeletedPods.Has(string(pod.UID)) {
newDeliberatelyDeletedCount++
}
newUnavailableCount++
}
} else {
if !ready || pod.DeletionTimestamp != nil {
oldUnavailableCount++
}
}
fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready)
}
// print a stable sorted list of pods by node for debugging
pw.Flush()
lines := strings.Split(buf.String(), "\n")
lines = lines[:len(lines)-1]
sort.Strings(lines[1:])
for _, line := range lines {
framework.Logf("%s", line)
}
// if there is an old and new pod at the same time, record a timestamp
deletedPerNode := make(map[string]int)
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
// ignore deleted pods
if pod.DeletionTimestamp != nil {
deletedPodUIDs.Insert(string(pod.UID))
if !deliberatelyDeletedPods.Has(string(pod.UID)) {
deletedPerNode[pod.Spec.NodeName]++
}
continue
}
podUIDs.Insert(string(pod.UID))
podVersion := pod.Spec.Containers[0].Env[0].Value
if podVersion == newVersion {
continue
}
// if this is a pod in an older version AND there is a new version of this pod, record when
// we started seeing this, otherwise delete the record (perhaps the node was drained)
if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 {
if _, ok := ageOfOldPod[string(pod.UID)]; !ok {
ageOfOldPod[string(pod.UID)] = now
}
} else {
delete(ageOfOldPod, string(pod.UID))
}
}
// purge the old pods list of any deleted pods
for uid := range ageOfOldPod {
if !podUIDs.Has(uid) {
delete(ageOfOldPod, uid)
}
}
deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs)
for _, versions := range nodesToVersions {
if versions[oldVersion] == 0 {
nodesWithoutOldVersion++
}
}
var errs []string
// invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving
for node, count := range deletedPerNode {
if count > 1 {
errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count))
}
}
// invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period)
for uid, firstSeen := range ageOfOldPod {
if now.Sub(firstSeen) > maxSurgeOverlap {
errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap))
}
}
// invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or
// if we deliberately deleted one of the new pods
if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) {
errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion))
}
// invariant: the total number of versions created should be 2
if versions.Len() > 2 {
errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len()))
}
for _, node := range nodes.List() {
// ignore pods that haven't been scheduled yet
if len(node) == 0 {
continue
}
versionCount := make(map[string]int)
// invariant: surge should never have more than one instance of a pod per node running
for version, count := range nodesToVersions[node] {
if count > 1 {
errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version))
}
versionCount[version] += count
}
// invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted)
for version, count := range nodesToDeletedVersions[node] {
if count > 2 {
errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version))
}
versionCount[version] += count
}
// invariant: on any node, we should never have more than two instances of a version (if we are getting evicted)
for version, count := range versionCount {
if count > 2 {
errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version))
}
}
}
if len(errs) > 0 {
sort.Strings(errs)
return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n"))
}
// Make sure every daemon pod on the node has been updated
nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
for _, node := range nodeNames {
switch {
case
// if we don't have the new version yet
nodesToVersions[node][newVersion] == 0,
// if there are more than one version on a node
len(nodesToVersions[node]) > 1,
// if there are still any deleted pods
len(nodesToDeletedVersions[node]) > 0,
// if any of the new pods are unavailable
newUnavailableCount > 0:
// inject a failure randomly to ensure the controller recovers
switch rand.Intn(25) {
// cause a random old pod to go unready
case 0:
// select a not-deleted pod of the old version
if pod := randomPod(pods, func(pod *v1.Pod) bool {
return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value
}); pod != nil {
// make the /tmp/ready file read only, which will cause readiness to fail
if _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil {
framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err)
} else {
framework.Logf("Marked old pod %s as unready", pod.Name)
}
}
case 1:
// delete a random pod
if pod := randomPod(pods, func(pod *v1.Pod) bool {
return pod.DeletionTimestamp == nil
}); pod != nil {
if err := c.CoreV1().Pods(ds.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
framework.Logf("Failed to delete pod %s early: %v", pod.Name, err)
} else {
framework.Logf("Deleted pod %s prematurely", pod.Name)
deliberatelyDeletedPods.Insert(string(pod.UID))
}
}
}
// then wait
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(ctx, c, ns, label, 2)
cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(2))
checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
})
/*
Release: v1.22
Testname: DaemonSet, list and delete a collection of DaemonSets
Description: When a DaemonSet is created it MUST succeed. It
MUST succeed when listing DaemonSets via a label selector. It
MUST succeed when deleting the DaemonSet via deleteCollection.
*/
framework.ConformanceIt("should list and delete a collection of DaemonSets", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
labelSelector := labels.SelectorFromSet(label).String()
dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
cs := f.ClientSet
one := int64(1)
ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("listing all DaemonSets")
dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to list Daemon Sets")
framework.ExpectEqual(len(dsList.Items), 1, "filtered list wasn't found")
ginkgo.By("DeleteCollection of the DaemonSets")
err = dsClient.DeleteCollection(ctx, metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to delete DaemonSets")
ginkgo.By("Verify that ReplicaSets have been deleted")
dsList, err = c.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to list DaemonSets")
framework.ExpectEqual(len(dsList.Items), 0, "filtered list should have no daemonset")
})
/* Release: v1.22
Testname: DaemonSet, status sub-resource
Description: When a DaemonSet is created it MUST succeed.
Attempt to read, update and patch its status sub-resource; all
mutating sub-resource operations MUST be visible to subsequent reads.
*/
framework.ConformanceIt("should verify changes to a daemon set status", func(ctx context.Context) {
label := map[string]string{daemonsetNameLabel: dsName}
labelSelector := labels.SelectorFromSet(label).String()
dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
cs := f.ClientSet
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector
return dsClient.Watch(ctx, options)
},
}
dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to list Daemon Sets")
ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
framework.ExpectNoError(err)
ginkgo.By("Getting /status")
dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}
dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(ctx, dsName, metav1.GetOptions{}, "status")
framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns)
dsStatusBytes, err := json.Marshal(dsStatusUnstructured)
framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
var dsStatus appsv1.DaemonSet
err = json.Unmarshal(dsStatusBytes, &dsStatus)
framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type")
framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions)
ginkgo.By("updating the DaemonSet Status")
var statusToUpdate, updatedStatus *appsv1.DaemonSet
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
statusToUpdate, err = dsClient.Get(ctx, dsName, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName)
statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{
Type: "StatusUpdate",
Status: "True",
Reason: "E2E",
Message: "Set from e2e test",
})
updatedStatus, err = dsClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
return err
})
framework.ExpectNoError(err, "Failed to update status. %v", err)
framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
ginkgo.By("watching for the daemon set status to be updated")
ctxUntil, cancel := context.WithTimeout(ctx, dsRetryTimeout)
defer cancel()
_, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
ds.Labels[daemonsetNameLabel] == dsName
if !found {
framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
return false, nil
}
for _, cond := range ds.Status.Conditions {
if cond.Type == "StatusUpdate" &&
cond.Reason == "E2E" &&
cond.Message == "Set from e2e test" {
framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
return found, nil
}
framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
}
}
object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
framework.Logf("Observed %v event: %+v", object, event.Type)
return false, nil
})
framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
framework.Logf("Daemon set %s has an updated status", dsName)
ginkgo.By("patching the DaemonSet Status")
daemonSetStatusPatch := appsv1.DaemonSet{
Status: appsv1.DaemonSetStatus{
Conditions: []appsv1.DaemonSetCondition{
{
Type: "StatusPatched",
Status: "True",
},
},
},
}
payload, err := json.Marshal(daemonSetStatusPatch)
framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
_, err = dsClient.Patch(ctx, dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status")
framework.ExpectNoError(err, "Failed to patch daemon set status", err)
ginkgo.By("watching for the daemon set status to be patched")
ctxUntil, cancel = context.WithTimeout(ctx, dsRetryTimeout)
defer cancel()
_, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
ds.Labels[daemonsetNameLabel] == dsName
if !found {
framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
return false, nil
}
for _, cond := range ds.Status.Conditions {
if cond.Type == "StatusPatched" {
framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
return found, nil
}
framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
}
}
object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
framework.Logf("Observed %v event: %v", object, event.Type)
return false, nil
})
framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
framework.Logf("Daemon set %s has a patched status", dsName)
})
})
// randomPod selects a random pod within pods that causes fn to return true, or nil
// if no pod can be found matching the criteria.
func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod {
podCount := len(pods)
for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ {
pod := &pods[(offset+i)%podCount]
if fn(pod) {
return pod
}
}
return nil
}
// getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
func getDaemonSetImagePatch(containerName, containerImage string) string {
return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
}
func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet {
ds := newDaemonSetWithLabel(dsName, image, label)
ds.ObjectMeta.Labels = nil
return ds
}
func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet {
return e2edaemonset.NewDaemonSet(dsName, image, label, nil, nil, []v1.ContainerPort{{ContainerPort: 9376}})
}
func listDaemonPods(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *v1.PodList {
selector := labels.Set(label).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := c.CoreV1().Pods(ns).List(ctx, options)
framework.ExpectNoError(err)
gomega.Expect(podList.Items).ToNot(gomega.BeEmpty())
return podList
}
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
daemonSetLabels := map[string]string{}
otherLabels := map[string]string{}
for k, v := range labels {
if strings.HasPrefix(k, daemonsetLabelPrefix) {
daemonSetLabels[k] = v
} else {
otherLabels[k] = v
}
}
return daemonSetLabels, otherLabels
}
func clearDaemonSetNodeLabels(ctx context.Context, c clientset.Interface) error {
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
if err != nil {
return err
}
for _, node := range nodeList.Items {
_, err := setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
if err != nil {
return err
}
}
return nil
}
// patchNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty
func patchNamespaceAnnotations(ctx context.Context, c clientset.Interface, nsName string) (*v1.Namespace, error) {
nsClient := c.CoreV1().Namespaces()
annotations := make(map[string]string)
for _, n := range NamespaceNodeSelectors {
annotations[n] = ""
}
nsPatch, err := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": annotations,
},
})
if err != nil {
return nil, err
}
return nsClient.Patch(ctx, nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{})
}
func setDaemonSetNodeLabels(ctx context.Context, c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
nodeClient := c.CoreV1().Nodes()
var newNode *v1.Node
var newLabels map[string]string
err := wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, func(ctx context.Context) (bool, error) {
node, err := nodeClient.Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return false, err
}
// remove all labels this test is creating
daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
if reflect.DeepEqual(daemonSetLabels, labels) {
newNode = node
return true, nil
}
node.Labels = otherLabels
for k, v := range labels {
node.Labels[k] = v
}
newNode, err = nodeClient.Update(ctx, node, metav1.UpdateOptions{})
if err == nil {
newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
return true, err
}
if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
framework.Logf("failed to update node due to resource version conflict")
return false, nil
}
return false, err
})
if err != nil {
return nil, err
} else if len(newLabels) != len(labels) {
return nil, fmt.Errorf("could not set daemon set test labels as expected")
}
return newNode, nil
}
func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds)
}
}
func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
pods := listDaemonPods(ctx, c, ns, label)
for _, pod := range pods.Items {
if pod.Spec.Containers[0].Image == newImage {
return true, nil
}
}
return false, nil
}
}
func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
return e2edaemonset.CheckDaemonPodOnNodes(f, ds, make([]string, 0))
}
func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, err
}
pods := podList.Items
unavailablePods := 0
nodesToUpdatedPodCount := make(map[string]int)
for _, pod := range pods {
// Ignore the pod on the node that is supposed to be deleted
if pod.DeletionTimestamp != nil {
continue
}
if !metav1.IsControlledBy(&pod, ds) {
continue
}
podImage := pod.Spec.Containers[0].Image
if podImage != image {
framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
} else {
nodesToUpdatedPodCount[pod.Spec.NodeName]++
}
if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
framework.Logf("Pod %s is not available", pod.Name)
unavailablePods++
}
}
if unavailablePods > maxUnavailable {
return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
}
// Make sure every daemon pod on the node has been updated
nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
for _, node := range nodeNames {
if nodesToUpdatedPodCount[node] == 0 {
return false, nil
}
}
return true, nil
}
}
func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
for _, pod := range podList.Items {
// Ignore all the DS pods that will be deleted
if pod.DeletionTimestamp != nil {
continue
}
podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
gomega.Expect(podHash).ToNot(gomega.BeEmpty())
if len(hash) > 0 {
framework.ExpectEqual(podHash, hash, "unexpected hash for pod %s", pod.Name)
}
}
}
func waitForHistoryCreated(ctx context.Context, c clientset.Interface, ns string, label map[string]string, numHistory int) {
listHistoryFn := func(ctx context.Context) (bool, error) {
selector := labels.Set(label).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
if err != nil {
return false, err
}
if len(historyList.Items) == numHistory {
return true, nil
}
framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
return false, nil
}
err := wait.PollImmediateWithContext(ctx, dsRetryPeriod, dsRetryTimeout, listHistoryFn)
framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
}
func listDaemonHistories(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList {
selector := labels.Set(label).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
framework.ExpectNoError(err)
gomega.Expect(historyList.Items).ToNot(gomega.BeEmpty())
return historyList
}
func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision {
var curHistory *appsv1.ControllerRevision
foundCurHistories := 0
for i := range historyList.Items {
history := &historyList.Items[i]
// Every history should have the hash label
gomega.Expect(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]).ToNot(gomega.BeEmpty())
match, err := daemon.Match(ds, history)
framework.ExpectNoError(err)
if match {
curHistory = history
foundCurHistories++
}
}
framework.ExpectEqual(foundCurHistories, 1)
gomega.Expect(curHistory).NotTo(gomega.BeNil())
return curHistory
}
func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
if _, err := c.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, fmt.Errorf("failed to get failed daemon pod %q: %w", pod.Name, err)
}
return false, nil
}
}