diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 346438790d2..bd90d65e79e 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -34,6 +34,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -84,6 +85,8 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { disableAttach bool attachLimit int registerDriver bool + lateBinding bool + enableTopology bool podInfo *bool scName string enableResizing bool // enable resizing for both CSI mock driver and storageClass. @@ -120,6 +123,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { driverOpts := drivers.CSIMockDriverOpts{ RegisterDriver: tp.registerDriver, PodInfo: tp.podInfo, + EnableTopology: tp.enableTopology, AttachLimit: tp.attachLimit, DisableAttach: tp.disableAttach, EnableResizing: tp.enableResizing, @@ -159,6 +163,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { Parameters: sc.Parameters, ClaimSize: "1Gi", ExpectedSize: "1Gi", + DelayBinding: m.tp.lateBinding, } if m.tp.scName != "" { scTest.StorageClassName = m.tp.scName @@ -687,7 +692,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { // Watch for all calls up to deletePod = true for { time.Sleep(1 * time.Second) - index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName) + _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName) framework.ExpectNoError(err, "while waiting for initial CSI calls") if index == 0 { // No CSI call received yet @@ -711,7 +716,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { ginkgo.By("Waiting for all remaining expected CSI calls") err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) { - index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName) + _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName) if err != nil { return true, fmt.Errorf("error waiting for expected CSI calls: %s", err) } @@ -730,6 +735,202 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } }) + ginkgo.Context("storage capacity", func() { + tests := []struct { + name string + resourceExhausted bool + lateBinding bool + topology bool + }{ + { + name: "unlimited", + }, + { + name: "exhausted, immediate binding", + resourceExhausted: true, + }, + { + name: "exhausted, late binding, no topology", + resourceExhausted: true, + lateBinding: true, + }, + { + name: "exhausted, late binding, with topology", + resourceExhausted: true, + lateBinding: true, + topology: true, + }, + } + + createVolume := "CreateVolume" + deleteVolume := "DeleteVolume" + publishVolume := "NodePublishVolume" + unpublishVolume := "NodeUnpublishVolume" + stageVolume := "NodeStageVolume" + unstageVolume := "NodeUnstageVolume" + + // These calls are assumed to occur in this order for + // each test run. + deterministicCalls := []string{ + createVolume, + stageVolume, + publishVolume, + unpublishVolume, + unstageVolume, + deleteVolume, + } + + for _, t := range tests { + test := t + ginkgo.It(test.name, func() { + var err error + params := testParameters{ + lateBinding: test.lateBinding, + enableTopology: test.topology, + + // Not strictly necessary, but without it, the external-attacher takes two minutes to detach + // the volume?! Looks like a bug. + disableAttach: true, + registerDriver: true, + } + + if test.resourceExhausted { + params.javascriptHooks = map[string]string{ + "globals": `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`, + // Every second call returns RESOURCEEXHAUSTED, starting with the first one. + "createVolumeStart": `console.log("Counter:", ++counter); if (counter % 2) { RESOURCEEXHAUSTED; } else { OK; }`, + } + } + + init(params) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + pvcWatch, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err, "create PVC watch") + defer pvcWatch.Stop() + + sc, claim, pod := createPod(false) + gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod") + bindingMode := storagev1.VolumeBindingImmediate + if test.lateBinding { + bindingMode = storagev1.VolumeBindingWaitForFirstConsumer + } + framework.ExpectEqual(*sc.VolumeBindingMode, bindingMode, "volume binding mode") + + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "failed to start pod") + err = e2epod.DeletePodWithWait(m.cs, pod) + framework.ExpectNoError(err, "failed to delete pod") + err = m.cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "failed to delete claim") + + normal := []csiCall{} + for _, method := range deterministicCalls { + normal = append(normal, csiCall{expectedMethod: method}) + } + expected := normal + // When simulating limited capacity, + // we expect exactly two CreateVolume + // calls because the first one should + // have failed. + if test.resourceExhausted { + expected = []csiCall{ + {expectedMethod: createVolume, expectedError: codes.ResourceExhausted}, + } + expected = append(expected, normal...) + } + + var calls []mockCSICall + err = wait.Poll(time.Second, csiPodRunningTimeout, func() (done bool, err error) { + c, index, err := compareCSICalls(deterministicCalls, expected, m.cs, f.Namespace.Name, driverPodName, driverContainerName) + if err != nil { + return true, fmt.Errorf("error waiting for expected CSI calls: %s", err) + } + calls = c + if index == 0 { + // No CSI call received yet + return false, nil + } + if len(expected) == index { + // all calls received + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err, "while waiting for all CSI calls") + + // The capacity error is dealt with in two different ways. + // + // For delayed binding, the external-provisioner should unset the node annotation + // to give the scheduler the opportunity to reschedule the pod onto a different + // node. + // + // For immediate binding, the external-scheduler must keep retrying. + // + // Unfortunately, the call log is the same in both cases. We have to collect + // additional evidence that rescheduling really happened. What we have observed + // above is how the PVC changed over time. Now we can analyze that. + ginkgo.By("Checking PVC events") + nodeAnnotationSet := false + nodeAnnotationReset := false + loop: + for { + select { + case event := <-pvcWatch.ResultChan(): + framework.Logf("PVC event %s: %#v", event.Type, event.Object) + switch event.Type { + case watch.Modified: + pvc, ok := event.Object.(*v1.PersistentVolumeClaim) + if !ok { + framework.Failf("PVC watch sent %#v instead of a PVC", event.Object) + } + _, set := pvc.Annotations["volume.kubernetes.io/selected-node"] + if set { + nodeAnnotationSet = true + } else if nodeAnnotationSet { + nodeAnnotationReset = true + } + case watch.Deleted: + break loop + case watch.Error: + // Can this occur when the apiserver is under heavy load? + // If yes, then we should bail out of the test here early and + // skip further checks instead of treating it as a test failure. + framework.Failf("PVC watch failed prematurely: %v", event.Object) + } + case <-time.After(10 * time.Second): + framework.Failf("Timeout while waiting to observe PVC list") + } + } + + // More tests when capacity is limited. + if test.resourceExhausted { + for _, call := range calls { + if call.Method == createVolume { + gomega.Expect(call.Error).To(gomega.ContainSubstring("code = ResourceExhausted"), "first CreateVolume error in\n%s", calls) + break + } + } + + if test.lateBinding { + gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set") + // Whether it gets reset depends on whether we have topology enabled. Without + // it, rescheduling is unnecessary. + if test.topology { + gomega.Expect(nodeAnnotationReset).To(gomega.BeTrue(), "selected-node should have been set") + } else { + gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") + } + } else { + gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set") + gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") + } + } + }) + } + }) }) func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error { @@ -790,9 +991,11 @@ func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create claim: %v", err) - pvcClaims := []*v1.PersistentVolumeClaim{claim} - _, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout) - framework.ExpectNoError(err, "Failed waiting for PVC to be bound %v", err) + if !t.DelayBinding { + pvcClaims := []*v1.PersistentVolumeClaim{claim} + _, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout) + framework.ExpectNoError(err, "Failed waiting for PVC to be bound %v", err) + } pod, err := startPausePodWithClaim(cs, claim, node, ns) framework.ExpectNoError(err, "Failed to create pod: %v", err) @@ -861,6 +1064,8 @@ func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.Volum // Dummy structure that parses just volume_attributes and error code out of logged CSI call type mockCSICall struct { + json string // full log entry + Method string Request struct { VolumeContext map[string]string `json:"volume_context"` @@ -869,6 +1074,7 @@ type mockCSICall struct { Code codes.Code `json:"code"` Message string `json:"message"` } + Error string } // checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes) @@ -946,7 +1152,9 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta continue } line = strings.TrimPrefix(line, "gRPCCall:") - var call mockCSICall + call := mockCSICall{ + json: string(line), + } err := json.Unmarshal([]byte(line), &call) if err != nil { framework.Logf("Could not parse CSI driver log line %q: %s", line, err) @@ -970,11 +1178,11 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta // // Only permanent errors are returned. Other errors are logged and no // calls are returned. The caller is expected to retry. -func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) (int, error) { +func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, int, error) { allCalls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName) if err != nil { framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err) - return 0, nil + return nil, 0, nil } // Remove all repeated and ignored calls @@ -1002,14 +1210,14 @@ func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs c // Compare current call with expected call expectedCall := expectedCallSequence[i] if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError { - return i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code) + return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code) } } if len(calls) > len(expectedCallSequence) { - return len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence)) + return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence)) } // All calls were correct - return len(calls), nil + return allCalls, len(calls), nil } diff --git a/test/e2e/storage/drivers/csi.go b/test/e2e/storage/drivers/csi.go index 694b76dfaae..4bc78c2e9af 100644 --- a/test/e2e/storage/drivers/csi.go +++ b/test/e2e/storage/drivers/csi.go @@ -214,6 +214,7 @@ type mockCSIDriver struct { podInfo *bool attachable bool attachLimit int + enableTopology bool enableNodeExpansion bool javascriptHooks map[string]string } @@ -224,6 +225,7 @@ type CSIMockDriverOpts struct { DisableAttach bool PodInfo *bool AttachLimit int + EnableTopology bool EnableResizing bool EnableNodeExpansion bool JavascriptHooks map[string]string @@ -272,6 +274,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) testsuites.TestDriver { }, manifests: driverManifests, podInfo: driverOpts.PodInfo, + enableTopology: driverOpts.EnableTopology, attachable: !driverOpts.DisableAttach, attachLimit: driverOpts.AttachLimit, enableNodeExpansion: driverOpts.EnableNodeExpansion, @@ -315,6 +318,10 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest containerArgs = append(containerArgs, "--disable-attach") } + if m.enableTopology { + containerArgs = append(containerArgs, "--enable-topology") + } + if m.attachLimit > 0 { containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit)) } diff --git a/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml index 80c54a05190..18b30b99288 100644 --- a/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml +++ b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml @@ -15,10 +15,14 @@ spec: serviceAccountName: csi-mock containers: - name: csi-provisioner - image: quay.io/k8scsi/csi-provisioner:v1.5.0 + image: quay.io/k8scsi/csi-provisioner:v1.6.0 args: - "--csi-address=$(ADDRESS)" - "--connection-timeout=15s" + # Topology support is needed for the pod rescheduling test + # ("storage capacity" in csi_mock_volume.go). + - "--feature-gates=Topology=true" + - "-v=5" env: - name: ADDRESS value: /csi/csi.sock