mock tests: ResourceExhausted error handling in external-provisioner

The mock driver gets instructed to return a ResourceExhausted error
for the first CreateVolume invocation via the storage class
parameters.

How this should be handled depends on the situation: for normal
volumes, we just want external-scheduler to retry. For late binding,
we want to reschedule the pod. It also depends on topology support.
This commit is contained in:
Patrick Ohly 2020-02-12 17:41:59 +01:00
parent 367a23e4d9
commit f117849582
3 changed files with 231 additions and 12 deletions

View File

@ -34,6 +34,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -84,6 +85,8 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
disableAttach bool
attachLimit int
registerDriver bool
lateBinding bool
enableTopology bool
podInfo *bool
scName string
enableResizing bool // enable resizing for both CSI mock driver and storageClass.
@ -120,6 +123,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
driverOpts := drivers.CSIMockDriverOpts{
RegisterDriver: tp.registerDriver,
PodInfo: tp.podInfo,
EnableTopology: tp.enableTopology,
AttachLimit: tp.attachLimit,
DisableAttach: tp.disableAttach,
EnableResizing: tp.enableResizing,
@ -159,6 +163,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
Parameters: sc.Parameters,
ClaimSize: "1Gi",
ExpectedSize: "1Gi",
DelayBinding: m.tp.lateBinding,
}
if m.tp.scName != "" {
scTest.StorageClassName = m.tp.scName
@ -687,7 +692,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
// Watch for all calls up to deletePod = true
for {
time.Sleep(1 * time.Second)
index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
framework.ExpectNoError(err, "while waiting for initial CSI calls")
if index == 0 {
// No CSI call received yet
@ -711,7 +716,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
ginkgo.By("Waiting for all remaining expected CSI calls")
err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
if err != nil {
return true, fmt.Errorf("error waiting for expected CSI calls: %s", err)
}
@ -730,6 +735,202 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}
})
ginkgo.Context("storage capacity", func() {
tests := []struct {
name string
resourceExhausted bool
lateBinding bool
topology bool
}{
{
name: "unlimited",
},
{
name: "exhausted, immediate binding",
resourceExhausted: true,
},
{
name: "exhausted, late binding, no topology",
resourceExhausted: true,
lateBinding: true,
},
{
name: "exhausted, late binding, with topology",
resourceExhausted: true,
lateBinding: true,
topology: true,
},
}
createVolume := "CreateVolume"
deleteVolume := "DeleteVolume"
publishVolume := "NodePublishVolume"
unpublishVolume := "NodeUnpublishVolume"
stageVolume := "NodeStageVolume"
unstageVolume := "NodeUnstageVolume"
// These calls are assumed to occur in this order for
// each test run.
deterministicCalls := []string{
createVolume,
stageVolume,
publishVolume,
unpublishVolume,
unstageVolume,
deleteVolume,
}
for _, t := range tests {
test := t
ginkgo.It(test.name, func() {
var err error
params := testParameters{
lateBinding: test.lateBinding,
enableTopology: test.topology,
// Not strictly necessary, but without it, the external-attacher takes two minutes to detach
// the volume?! Looks like a bug.
disableAttach: true,
registerDriver: true,
}
if test.resourceExhausted {
params.javascriptHooks = map[string]string{
"globals": `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`,
// Every second call returns RESOURCEEXHAUSTED, starting with the first one.
"createVolumeStart": `console.log("Counter:", ++counter); if (counter % 2) { RESOURCEEXHAUSTED; } else { OK; }`,
}
}
init(params)
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pvcWatch, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "create PVC watch")
defer pvcWatch.Stop()
sc, claim, pod := createPod(false)
gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod")
bindingMode := storagev1.VolumeBindingImmediate
if test.lateBinding {
bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
}
framework.ExpectEqual(*sc.VolumeBindingMode, bindingMode, "volume binding mode")
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
framework.ExpectNoError(err, "failed to start pod")
err = e2epod.DeletePodWithWait(m.cs, pod)
framework.ExpectNoError(err, "failed to delete pod")
err = m.cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "failed to delete claim")
normal := []csiCall{}
for _, method := range deterministicCalls {
normal = append(normal, csiCall{expectedMethod: method})
}
expected := normal
// When simulating limited capacity,
// we expect exactly two CreateVolume
// calls because the first one should
// have failed.
if test.resourceExhausted {
expected = []csiCall{
{expectedMethod: createVolume, expectedError: codes.ResourceExhausted},
}
expected = append(expected, normal...)
}
var calls []mockCSICall
err = wait.Poll(time.Second, csiPodRunningTimeout, func() (done bool, err error) {
c, index, err := compareCSICalls(deterministicCalls, expected, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
if err != nil {
return true, fmt.Errorf("error waiting for expected CSI calls: %s", err)
}
calls = c
if index == 0 {
// No CSI call received yet
return false, nil
}
if len(expected) == index {
// all calls received
return true, nil
}
return false, nil
})
framework.ExpectNoError(err, "while waiting for all CSI calls")
// The capacity error is dealt with in two different ways.
//
// For delayed binding, the external-provisioner should unset the node annotation
// to give the scheduler the opportunity to reschedule the pod onto a different
// node.
//
// For immediate binding, the external-scheduler must keep retrying.
//
// Unfortunately, the call log is the same in both cases. We have to collect
// additional evidence that rescheduling really happened. What we have observed
// above is how the PVC changed over time. Now we can analyze that.
ginkgo.By("Checking PVC events")
nodeAnnotationSet := false
nodeAnnotationReset := false
loop:
for {
select {
case event := <-pvcWatch.ResultChan():
framework.Logf("PVC event %s: %#v", event.Type, event.Object)
switch event.Type {
case watch.Modified:
pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
if !ok {
framework.Failf("PVC watch sent %#v instead of a PVC", event.Object)
}
_, set := pvc.Annotations["volume.kubernetes.io/selected-node"]
if set {
nodeAnnotationSet = true
} else if nodeAnnotationSet {
nodeAnnotationReset = true
}
case watch.Deleted:
break loop
case watch.Error:
// Can this occur when the apiserver is under heavy load?
// If yes, then we should bail out of the test here early and
// skip further checks instead of treating it as a test failure.
framework.Failf("PVC watch failed prematurely: %v", event.Object)
}
case <-time.After(10 * time.Second):
framework.Failf("Timeout while waiting to observe PVC list")
}
}
// More tests when capacity is limited.
if test.resourceExhausted {
for _, call := range calls {
if call.Method == createVolume {
gomega.Expect(call.Error).To(gomega.ContainSubstring("code = ResourceExhausted"), "first CreateVolume error in\n%s", calls)
break
}
}
if test.lateBinding {
gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set")
// Whether it gets reset depends on whether we have topology enabled. Without
// it, rescheduling is unnecessary.
if test.topology {
gomega.Expect(nodeAnnotationReset).To(gomega.BeTrue(), "selected-node should have been set")
} else {
gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
}
} else {
gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set")
gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
}
}
})
}
})
})
func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
@ -790,9 +991,11 @@ func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e
claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
framework.ExpectNoError(err, "Failed to create claim: %v", err)
pvcClaims := []*v1.PersistentVolumeClaim{claim}
_, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout)
framework.ExpectNoError(err, "Failed waiting for PVC to be bound %v", err)
if !t.DelayBinding {
pvcClaims := []*v1.PersistentVolumeClaim{claim}
_, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout)
framework.ExpectNoError(err, "Failed waiting for PVC to be bound %v", err)
}
pod, err := startPausePodWithClaim(cs, claim, node, ns)
framework.ExpectNoError(err, "Failed to create pod: %v", err)
@ -861,6 +1064,8 @@ func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.Volum
// Dummy structure that parses just volume_attributes and error code out of logged CSI call
type mockCSICall struct {
json string // full log entry
Method string
Request struct {
VolumeContext map[string]string `json:"volume_context"`
@ -869,6 +1074,7 @@ type mockCSICall struct {
Code codes.Code `json:"code"`
Message string `json:"message"`
}
Error string
}
// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes)
@ -946,7 +1152,9 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta
continue
}
line = strings.TrimPrefix(line, "gRPCCall:")
var call mockCSICall
call := mockCSICall{
json: string(line),
}
err := json.Unmarshal([]byte(line), &call)
if err != nil {
framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
@ -970,11 +1178,11 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta
//
// Only permanent errors are returned. Other errors are logged and no
// calls are returned. The caller is expected to retry.
func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) (int, error) {
func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, int, error) {
allCalls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName)
if err != nil {
framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
return 0, nil
return nil, 0, nil
}
// Remove all repeated and ignored calls
@ -1002,14 +1210,14 @@ func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs c
// Compare current call with expected call
expectedCall := expectedCallSequence[i]
if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
return i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
}
}
if len(calls) > len(expectedCallSequence) {
return len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
}
// All calls were correct
return len(calls), nil
return allCalls, len(calls), nil
}

View File

@ -214,6 +214,7 @@ type mockCSIDriver struct {
podInfo *bool
attachable bool
attachLimit int
enableTopology bool
enableNodeExpansion bool
javascriptHooks map[string]string
}
@ -224,6 +225,7 @@ type CSIMockDriverOpts struct {
DisableAttach bool
PodInfo *bool
AttachLimit int
EnableTopology bool
EnableResizing bool
EnableNodeExpansion bool
JavascriptHooks map[string]string
@ -272,6 +274,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) testsuites.TestDriver {
},
manifests: driverManifests,
podInfo: driverOpts.PodInfo,
enableTopology: driverOpts.EnableTopology,
attachable: !driverOpts.DisableAttach,
attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion,
@ -315,6 +318,10 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest
containerArgs = append(containerArgs, "--disable-attach")
}
if m.enableTopology {
containerArgs = append(containerArgs, "--enable-topology")
}
if m.attachLimit > 0 {
containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
}

View File

@ -15,10 +15,14 @@ spec:
serviceAccountName: csi-mock
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v1.5.0
image: quay.io/k8scsi/csi-provisioner:v1.6.0
args:
- "--csi-address=$(ADDRESS)"
- "--connection-timeout=15s"
# Topology support is needed for the pod rescheduling test
# ("storage capacity" in csi_mock_volume.go).
- "--feature-gates=Topology=true"
- "-v=5"
env:
- name: ADDRESS
value: /csi/csi.sock