diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index adc8f943bea..56e4bd1f96b 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -36,6 +36,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/storage/v1/util:go_default_library", "//pkg/client/conditions:go_default_library", + "//pkg/controller/volume/scheduling:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index c69ee6879e9..e8643e192ea 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -20,6 +20,7 @@ import ( "context" "crypto/sha256" "encoding/json" + "errors" "fmt" "strconv" "strings" @@ -32,11 +33,13 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epv "k8s.io/kubernetes/test/e2e/framework/pv" @@ -91,9 +94,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { enableTopology bool podInfo *bool storageCapacity *bool - scName string - enableResizing bool // enable resizing for both CSI mock driver and storageClass. - enableNodeExpansion bool // enable node expansion for CSI mock driver + scName string // pre-selected storage class name; must be unique in the cluster + enableResizing bool // enable resizing for both CSI mock driver and storageClass. + enableNodeExpansion bool // enable node expansion for CSI mock driver // just disable resizing on driver it overrides enableResizing flag for CSI mock driver disableResizingOnDriver bool javascriptHooks map[string]string @@ -179,7 +182,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { m.pods = append(m.pods, pod) } } else { - class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) + class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) if class != nil { m.sc[class.Name] = class } @@ -952,7 +955,6 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { ) // Tests that expect a failure are slow because we have to wait for a while // to be sure that the volume isn't getting created. - // TODO: stop waiting as soon as we see the "node(s) did not have enough free storage" pod event? tests := []struct { name string storageCapacity *bool @@ -967,10 +969,16 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { storageCapacity: &no, }, { - name: "CSIStorageCapacity used, no capacity [Slow]", + name: "CSIStorageCapacity used, no capacity", storageCapacity: &yes, expectFailure: true, }, + { + name: "CSIStorageCapacity used, insufficient capacity", + storageCapacity: &yes, + expectFailure: true, + capacities: []string{"1Mi"}, + }, { name: "CSIStorageCapacity used, have capacity", storageCapacity: &yes, @@ -983,22 +991,17 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { for _, t := range tests { test := t ginkgo.It(t.name, func() { + scName := "mock-csi-storage-capacity-" + f.UniqueName init(testParameters{ registerDriver: true, - scName: "csi-mock-sc-" + f.UniqueName, + scName: scName, storageCapacity: test.storageCapacity, lateBinding: true, }) - defer cleanup() - // kube-scheduler may need some time before it gets the CSIDriver object. - // Without it, scheduling will happen without considering capacity, which - // is not what we want to test. - time.Sleep(5 * time.Second) - - sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) - + // The storage class uses a random name, therefore we have to create it first + // before adding CSIStorageCapacity objects for it. for _, capacityStr := range test.capacities { capacityQuantity := resource.MustParse(capacityStr) capacity := &storagev1alpha1.CSIStorageCapacity{ @@ -1006,7 +1009,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { GenerateName: "fake-capacity-", }, // Empty topology, usable by any node. - StorageClassName: sc.Name, + StorageClassName: scName, NodeTopology: &metav1.LabelSelector{}, Capacity: &capacityQuantity, } @@ -1017,9 +1020,35 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) } - err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) + // kube-scheduler may need some time before it gets the CSIDriver and CSIStorageCapacity objects. + // Without them, scheduling doesn't run as expected by the test. + syncDelay := 5 * time.Second + time.Sleep(syncDelay) + + sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) + framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used") + + waitCtx, cancel := context.WithTimeout(context.Background(), podStartTimeout) + defer cancel() + condition := anyOf( + podRunning(waitCtx, f.ClientSet, pod.Name, pod.Namespace), + // We only just created the CSIStorageCapacity objects, therefore + // we have to ignore all older events, plus the syncDelay as our + // safety margin. + podHasStorage(waitCtx, f.ClientSet, pod.Name, pod.Namespace, time.Now().Add(syncDelay)), + ) + err = wait.PollImmediateUntil(poll, condition, waitCtx.Done()) if test.expectFailure { - framework.ExpectError(err, "pod unexpectedly started to run") + switch { + case errors.Is(err, context.DeadlineExceeded), + errors.Is(err, wait.ErrWaitTimeout), + errors.Is(err, errNotEnoughSpace): + // Okay, we expected that. + case err == nil: + framework.Fail("pod unexpectedly started to run") + default: + framework.Failf("unexpected error while waiting for pod: %v", err) + } } else { framework.ExpectNoError(err, "failed to start pod") } @@ -1032,6 +1061,78 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) }) +// A lot of this code was copied from e2e/framework. It would be nicer +// if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754 +func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + switch pod.Status.Phase { + case v1.PodRunning: + return true, nil + case v1.PodFailed, v1.PodSucceeded: + return false, errPodCompleted + } + return false, nil + } +} + +const ( + podStartTimeout = 5 * time.Minute + poll = 2 * time.Second +) + +var ( + errPodCompleted = fmt.Errorf("pod ran to completion") + errNotEnoughSpace = errors.New(scheduling.ErrReasonNotEnoughSpace) +) + +func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc { + // Check for events of this pod. Copied from test/e2e/common/container_probe.go. + expectedEvent := fields.Set{ + "involvedObject.kind": "Pod", + "involvedObject.name": podName, + "involvedObject.namespace": namespace, + "reason": "FailedScheduling", + }.AsSelector().String() + options := metav1.ListOptions{ + FieldSelector: expectedEvent, + } + // copied from test/e2e/framework/events/events.go + return func() (bool, error) { + // We cannot be sure here whether it has enough storage, only when + // it hasn't. In that case we abort waiting with a special error. + events, err := c.CoreV1().Events(namespace).List(ctx, options) + if err != nil { + return false, fmt.Errorf("got error while getting events: %w", err) + } + for _, event := range events.Items { + if /* event.CreationTimestamp.After(when) && + */strings.Contains(event.Message, scheduling.ErrReasonNotEnoughSpace) { + return false, errNotEnoughSpace + } + } + return false, nil + } +} + +func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc { + return func() (bool, error) { + for _, condition := range conditions { + done, err := condition() + if err != nil { + return false, err + } + if done { + return true, nil + } + } + return false, nil + } +} + func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error { waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) { pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) @@ -1073,8 +1174,11 @@ func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Inte return attachLimit, nil } -func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { +func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { class := newStorageClass(t, ns, "") + if scName != "" { + class.Name = scName + } var err error _, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{}) if err != nil {