mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #89041 from jsafrane/stage-error-tests
Add NodeStage error tests
This commit is contained in:
commit
15bb54c2d2
@ -89,6 +89,7 @@ go_library(
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/google.golang.org/api/googleapi:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/codes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@ -54,8 +55,30 @@ const (
|
||||
csiResizeWaitPeriod = 5 * time.Minute
|
||||
// how long to wait for Resizing Condition on PVC to appear
|
||||
csiResizingConditionWait = 2 * time.Minute
|
||||
|
||||
// How log to wait for kubelet to unstage a volume after a pod is deleted
|
||||
csiUnstageWaitTimeout = 1 * time.Minute
|
||||
|
||||
// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
|
||||
driverPodName = "csi-mockplugin-0"
|
||||
// Name of CSI driver container name
|
||||
driverContainerName = "mock"
|
||||
)
|
||||
|
||||
// csiCall represents an expected call from Kubernetes to CSI mock driver and
|
||||
// expected return value.
|
||||
// When matching expected csiCall with a real CSI mock driver output, one csiCall
|
||||
// matches *one or more* calls with the same method and error code.
|
||||
// This is due to exponential backoff in Kubernetes, where the test cannot expect
|
||||
// exact number of call repetitions.
|
||||
type csiCall struct {
|
||||
expectedMethod string
|
||||
expectedError codes.Code
|
||||
// This is a mark for the test itself to delete the tested pod *after*
|
||||
// this csiCall is received.
|
||||
deletePod bool
|
||||
}
|
||||
|
||||
var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
type testParameters struct {
|
||||
disableAttach bool
|
||||
@ -67,6 +90,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
enableNodeExpansion bool // enable node expansion for CSI mock driver
|
||||
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
|
||||
disableResizingOnDriver bool
|
||||
javascriptHooks map[string]string
|
||||
}
|
||||
|
||||
type mockDriverSetup struct {
|
||||
@ -100,6 +124,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
DisableAttach: tp.disableAttach,
|
||||
EnableResizing: tp.enableResizing,
|
||||
EnableNodeExpansion: tp.enableNodeExpansion,
|
||||
JavascriptHooks: tp.javascriptHooks,
|
||||
}
|
||||
|
||||
// this just disable resizing on driver, keeping resizing on SC enabled.
|
||||
@ -344,9 +369,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
framework.ExpectNoError(err, "while deleting")
|
||||
|
||||
ginkgo.By("Checking CSI driver logs")
|
||||
// The driver is deployed as a statefulset with stable pod names
|
||||
driverPodName := "csi-mockplugin-0"
|
||||
err = checkPodLogs(m.cs, f.Namespace.Name, driverPodName, "mock", pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled)
|
||||
err = checkPodLogs(m.cs, f.Namespace.Name, driverPodName, driverContainerName, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
}
|
||||
@ -558,6 +581,155 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.Context("CSI NodeStage error cases [Slow]", func() {
|
||||
// Global variable in all scripts (called before each test)
|
||||
globalScript := `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`
|
||||
trackedCalls := []string{
|
||||
"NodeStageVolume",
|
||||
"NodeUnstageVolume",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
expectPodRunning bool
|
||||
expectedCalls []csiCall
|
||||
nodeStageScript string
|
||||
nodeUnstageScript string
|
||||
}{
|
||||
{
|
||||
// This is already tested elsewhere, adding simple good case here to test the test framework.
|
||||
name: "should call NodeUnstage after NodeStage success",
|
||||
expectPodRunning: true,
|
||||
expectedCalls: []csiCall{
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
nodeStageScript: `OK;`,
|
||||
},
|
||||
{
|
||||
// Kubelet should repeat NodeStage as long as the pod exists
|
||||
name: "should retry NodeStage after NodeStage final error",
|
||||
expectPodRunning: true,
|
||||
expectedCalls: []csiCall{
|
||||
// This matches all 3 NodeStage calls with InvalidArgument error
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument},
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
// Fail first 3 NodeStage requests, 4th succeeds
|
||||
nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { INVALIDARGUMENT; } else { OK; }`,
|
||||
},
|
||||
{
|
||||
// Kubelet should repeat NodeStage as long as the pod exists
|
||||
name: "should retry NodeStage after NodeStage ephemeral error",
|
||||
expectPodRunning: true,
|
||||
expectedCalls: []csiCall{
|
||||
// This matches all 3 NodeStage calls with DeadlineExceeded error
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded},
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
// Fail first 3 NodeStage requests, 4th succeeds
|
||||
nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { DEADLINEEXCEEDED; } else { OK; }`,
|
||||
},
|
||||
{
|
||||
// After NodeUnstage with ephemeral error, the driver may continue staging the volume.
|
||||
// Kubelet should call NodeUnstage to make sure the volume is really unstaged after
|
||||
// the pod is deleted.
|
||||
name: "should call NodeUnstage after NodeStage ephemeral error",
|
||||
expectPodRunning: false,
|
||||
expectedCalls: []csiCall{
|
||||
// Delete the pod before NodeStage succeeds - it should get "uncertain" because of ephemeral error
|
||||
// This matches all repeated NodeStage calls with DeadlineExceeded error (due to exp. backoff).
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded, deletePod: true},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
nodeStageScript: `DEADLINEEXCEEDED;`,
|
||||
},
|
||||
{
|
||||
// After NodeUnstage with final error, kubelet can be sure the volume is not staged.
|
||||
// The test checks that NodeUnstage is *not* called.
|
||||
name: "should not call NodeUnstage after NodeStage final error",
|
||||
expectPodRunning: false,
|
||||
expectedCalls: []csiCall{
|
||||
// Delete the pod before NodeStage succeeds - it should get "globally unmounted" because of final error.
|
||||
// This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff).
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument, deletePod: true},
|
||||
},
|
||||
nodeStageScript: `INVALIDARGUMENT;`,
|
||||
},
|
||||
}
|
||||
for _, t := range tests {
|
||||
test := t
|
||||
ginkgo.It(test.name, func() {
|
||||
scripts := map[string]string{
|
||||
"globals": globalScript,
|
||||
"nodeStageVolumeStart": test.nodeStageScript,
|
||||
"nodeUnstageVolumeStart": test.nodeUnstageScript,
|
||||
}
|
||||
init(testParameters{
|
||||
disableAttach: true,
|
||||
registerDriver: true,
|
||||
scName: "csi-mock-sc-" + f.UniqueName,
|
||||
javascriptHooks: scripts,
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
// Wait for PVC to get bound to make sure the CSI driver is fully started.
|
||||
err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout)
|
||||
framework.ExpectNoError(err, "while waiting for PVC to get provisioned")
|
||||
|
||||
ginkgo.By("Waiting for expected CSI calls")
|
||||
// Watch for all calls up to deletePod = true
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
|
||||
framework.ExpectNoError(err, "while waiting for initial CSI calls")
|
||||
if index == 0 {
|
||||
// No CSI call received yet
|
||||
continue
|
||||
}
|
||||
// Check the last *received* call wanted the pod to be deleted
|
||||
if test.expectedCalls[index-1].deletePod {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if test.expectPodRunning {
|
||||
ginkgo.By("Waiting for pod to be running")
|
||||
err := e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
|
||||
framework.ExpectNoError(err, "Failed to start pod: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Deleting the previously created pod")
|
||||
err = e2epod.DeletePodWithWait(m.cs, pod)
|
||||
framework.ExpectNoError(err, "while deleting")
|
||||
|
||||
ginkgo.By("Waiting for all remaining expected CSI calls")
|
||||
err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
|
||||
index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, f.Namespace.Name, driverPodName, driverContainerName)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("error waiting for expected CSI calls: %s", err)
|
||||
}
|
||||
if index == 0 {
|
||||
// No CSI call received yet
|
||||
return false, nil
|
||||
}
|
||||
if len(test.expectedCalls) == index {
|
||||
// all calls received
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "while waiting for all CSI calls")
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
|
||||
@ -687,6 +859,18 @@ func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.Volum
|
||||
return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
// Dummy structure that parses just volume_attributes and error code out of logged CSI call
|
||||
type mockCSICall struct {
|
||||
Method string
|
||||
Request struct {
|
||||
VolumeContext map[string]string `json:"volume_context"`
|
||||
}
|
||||
FullError struct {
|
||||
Code codes.Code `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
}
|
||||
|
||||
// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes)
|
||||
// has the matching NodeUnpublish
|
||||
func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string, pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled bool) error {
|
||||
@ -709,29 +893,15 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
|
||||
framework.Logf("CSI driver logs:\n%s", log)
|
||||
// Find NodePublish in the logs
|
||||
foundAttributes := sets.NewString()
|
||||
logLines := strings.Split(log, "\n")
|
||||
numNodePublishVolume := 0
|
||||
numNodeUnpublishVolume := 0
|
||||
for _, line := range logLines {
|
||||
if !strings.HasPrefix(line, "gRPCCall:") {
|
||||
continue
|
||||
}
|
||||
line = strings.TrimPrefix(line, "gRPCCall:")
|
||||
// Dummy structure that parses just volume_attributes out of logged CSI call
|
||||
type MockCSICall struct {
|
||||
Method string
|
||||
Request struct {
|
||||
VolumeContext map[string]string `json:"volume_context"`
|
||||
}
|
||||
}
|
||||
var call MockCSICall
|
||||
err := json.Unmarshal([]byte(line), &call)
|
||||
if err != nil {
|
||||
framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
|
||||
continue
|
||||
}
|
||||
calls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, call := range calls {
|
||||
switch call.Method {
|
||||
case "/csi.v1.Node/NodePublishVolume":
|
||||
case "NodePublishVolume":
|
||||
numNodePublishVolume++
|
||||
if numNodePublishVolume == 1 {
|
||||
// Check that NodePublish had expected attributes for first volume
|
||||
@ -743,7 +913,7 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
|
||||
}
|
||||
}
|
||||
}
|
||||
case "/csi.v1.Node/NodeUnpublishVolume":
|
||||
case "NodeUnpublishVolume":
|
||||
framework.Logf("Found NodeUnpublishVolume: %+v", call)
|
||||
numNodeUnpublishVolume++
|
||||
}
|
||||
@ -768,6 +938,88 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, error) {
|
||||
// Load logs of driver pod
|
||||
log, err := e2epod.GetPodLogs(cs, namespace, driverPodName, driverContainerName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not load CSI driver logs: %s", err)
|
||||
}
|
||||
framework.Logf("CSI driver logs:\n%s", log)
|
||||
|
||||
logLines := strings.Split(log, "\n")
|
||||
var calls []mockCSICall
|
||||
for _, line := range logLines {
|
||||
if !strings.HasPrefix(line, "gRPCCall:") {
|
||||
continue
|
||||
}
|
||||
line = strings.TrimPrefix(line, "gRPCCall:")
|
||||
var call mockCSICall
|
||||
err := json.Unmarshal([]byte(line), &call)
|
||||
if err != nil {
|
||||
framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
|
||||
methodParts := strings.Split(call.Method, "/")
|
||||
call.Method = methodParts[len(methodParts)-1]
|
||||
|
||||
calls = append(calls, call)
|
||||
}
|
||||
return calls, nil
|
||||
}
|
||||
|
||||
// compareCSICalls compares expectedCalls with logs of the mock driver.
|
||||
// It returns index of the first expectedCall that was *not* received
|
||||
// yet or error when calls do not match.
|
||||
// All repeated calls to the CSI mock driver (e.g. due to exponential backoff)
|
||||
// are squashed and checked against single expectedCallSequence item.
|
||||
//
|
||||
// Only permanent errors are returned. Other errors are logged and no
|
||||
// calls are returned. The caller is expected to retry.
|
||||
func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) (int, error) {
|
||||
allCalls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName)
|
||||
if err != nil {
|
||||
framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Remove all repeated and ignored calls
|
||||
tracked := sets.NewString(trackedCalls...)
|
||||
var calls []mockCSICall
|
||||
var last mockCSICall
|
||||
for _, c := range allCalls {
|
||||
if !tracked.Has(c.Method) {
|
||||
continue
|
||||
}
|
||||
if c.Method != last.Method || c.FullError.Code != last.FullError.Code {
|
||||
last = c
|
||||
calls = append(calls, c)
|
||||
}
|
||||
// This call is the same as the last one, ignore it.
|
||||
}
|
||||
|
||||
for i, c := range calls {
|
||||
if i >= len(expectedCallSequence) {
|
||||
// Log all unexpected calls first, return error below outside the loop.
|
||||
framework.Logf("Unexpected CSI driver call: %s (%d)", c.Method, c.FullError)
|
||||
continue
|
||||
}
|
||||
|
||||
// Compare current call with expected call
|
||||
expectedCall := expectedCallSequence[i]
|
||||
if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
|
||||
return i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
|
||||
}
|
||||
}
|
||||
if len(calls) > len(expectedCallSequence) {
|
||||
return len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
|
||||
}
|
||||
// All calls were correct
|
||||
return len(calls), nil
|
||||
|
||||
}
|
||||
|
||||
func waitForCSIDriver(cs clientset.Interface, driverName string) error {
|
||||
timeout := 4 * time.Minute
|
||||
|
||||
|
@ -35,6 +35,7 @@ go_library(
|
||||
"//test/e2e/storage/vsphere:go_default_library",
|
||||
"//test/utils/image:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/gopkg.in/yaml.v2:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -41,6 +41,8 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/onsi/ginkgo"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
@ -213,6 +215,7 @@ type mockCSIDriver struct {
|
||||
attachable bool
|
||||
attachLimit int
|
||||
enableNodeExpansion bool
|
||||
javascriptHooks map[string]string
|
||||
}
|
||||
|
||||
// CSIMockDriverOpts defines options used for csi driver
|
||||
@ -223,6 +226,7 @@ type CSIMockDriverOpts struct {
|
||||
AttachLimit int
|
||||
EnableResizing bool
|
||||
EnableNodeExpansion bool
|
||||
JavascriptHooks map[string]string
|
||||
}
|
||||
|
||||
var _ testsuites.TestDriver = &mockCSIDriver{}
|
||||
@ -271,6 +275,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) testsuites.TestDriver {
|
||||
attachable: !driverOpts.DisableAttach,
|
||||
attachLimit: driverOpts.AttachLimit,
|
||||
enableNodeExpansion: driverOpts.EnableNodeExpansion,
|
||||
javascriptHooks: driverOpts.JavascriptHooks,
|
||||
}
|
||||
}
|
||||
|
||||
@ -318,6 +323,26 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest
|
||||
containerArgs = append(containerArgs, "--node-expand-required=true")
|
||||
}
|
||||
|
||||
// Create a config map with javascript hooks. Create it even when javascriptHooks
|
||||
// are empty, so we can unconditionally add it to the mock pod.
|
||||
const hooksConfigMapName = "mock-driver-hooks"
|
||||
hooksYaml, err := yaml.Marshal(m.javascriptHooks)
|
||||
framework.ExpectNoError(err)
|
||||
hooks := &v1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: hooksConfigMapName,
|
||||
},
|
||||
Data: map[string]string{
|
||||
"hooks.yaml": string(hooksYaml),
|
||||
},
|
||||
}
|
||||
_, err = f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Create(context.TODO(), hooks, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
if len(m.javascriptHooks) > 0 {
|
||||
containerArgs = append(containerArgs, "--hooks-file=/etc/hooks/hooks.yaml")
|
||||
}
|
||||
|
||||
o := utils.PatchCSIOptions{
|
||||
OldDriverName: "csi-mock",
|
||||
NewDriverName: "csi-mock-" + f.UniqueName,
|
||||
@ -342,6 +367,10 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest
|
||||
|
||||
return config, func() {
|
||||
ginkgo.By("uninstalling csi mock driver")
|
||||
err := f.ClientSet.CoreV1().ConfigMaps(f.Namespace.Name).Delete(context.TODO(), hooksConfigMapName, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
framework.Logf("deleting failed: %s", err)
|
||||
}
|
||||
cleanup()
|
||||
cancelLogging()
|
||||
}
|
||||
|
@ -65,6 +65,8 @@ spec:
|
||||
- mountPath: /var/lib/kubelet/pods
|
||||
mountPropagation: Bidirectional
|
||||
name: mountpoint-dir
|
||||
- name: hooks
|
||||
mountPath: /etc/hooks
|
||||
volumes:
|
||||
- hostPath:
|
||||
path: /var/lib/kubelet/plugins/csi-mock
|
||||
@ -78,3 +80,6 @@ spec:
|
||||
path: /var/lib/kubelet/plugins_registry
|
||||
type: Directory
|
||||
name: registration-dir
|
||||
- name: hooks
|
||||
configMap:
|
||||
name: mock-driver-hooks
|
||||
|
Loading…
Reference in New Issue
Block a user