diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 36f673f6fd4..7f38af408ec 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -19,15 +19,16 @@ package storage import ( "context" "crypto/sha256" - "encoding/json" "errors" "fmt" "math/rand" "strconv" "strings" + "sync/atomic" "time" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" storagev1alpha1 "k8s.io/api/storage/v1alpha1" @@ -75,13 +76,6 @@ const ( // How log to wait for kubelet to unstage a volume after a pod is deleted csiUnstageWaitTimeout = 1 * time.Minute - - // Name of CSI driver pod name (it's in a StatefulSet with a stable name) - driverPodName = "csi-mockplugin-0" - // Name of CSI driver container name - driverContainerName = "mock" - // Prefix of the mock driver grpc log - grpcCallPrefix = "gRPCCall:" ) // csiCall represents an expected call from Kubernetes to CSI mock driver and @@ -113,7 +107,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { // just disable resizing on driver it overrides enableResizing flag for CSI mock driver disableResizingOnDriver bool enableSnapshot bool - javascriptHooks map[string]string + hooks *drivers.Hooks tokenRequests []storagev1.TokenRequest requiresRepublish *bool fsGroupPolicy *storagev1.FSGroupPolicy @@ -127,7 +121,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { pvcs []*v1.PersistentVolumeClaim sc map[string]*storagev1.StorageClass vsc map[string]*unstructured.Unstructured - driver storageframework.TestDriver + driver drivers.MockCSITestDriver provisioner string tp testParameters } @@ -155,12 +149,29 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { EnableResizing: tp.enableResizing, EnableNodeExpansion: tp.enableNodeExpansion, EnableSnapshot: tp.enableSnapshot, - JavascriptHooks: tp.javascriptHooks, TokenRequests: tp.tokenRequests, RequiresRepublish: tp.requiresRepublish, FSGroupPolicy: tp.fsGroupPolicy, } + // At the moment, only tests which need hooks are + // using the embedded CSI mock driver. The rest run + // the driver inside the cluster although they could + // changed to use embedding merely by setting + // driverOpts.embedded to true. + // + // Not enabling it for all tests minimizes + // the risk that the introduction of embedded breaks + // some existings tests and avoids a dependency + // on port forwarding, which is important if some of + // these tests are supposed to become part of + // conformance testing (port forwarding isn't + // currently required). + if tp.hooks != nil { + driverOpts.Embedded = true + driverOpts.Hooks = *tp.hooks + } + // this just disable resizing on driver, keeping resizing on SC enabled. if tp.disableResizingOnDriver { driverOpts.EnableResizing = false @@ -188,10 +199,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { ginkgo.By("Creating pod") - var sc *storagev1.StorageClass - if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok { - sc = dDriver.GetDynamicProvisionStorageClass(m.config, "") - } + sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") scTest := testsuites.StorageClassTest{ Name: m.driver.GetDriverInfo().Name, Timeouts: f.Timeouts, @@ -237,10 +245,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { createPodWithFSGroup := func(fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { ginkgo.By("Creating pod with fsGroup") nodeSelection := m.config.ClientNodeSelection - var sc *storagev1.StorageClass - if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok { - sc = dDriver.GetDynamicProvisionStorageClass(m.config, "") - } + sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") scTest := testsuites.StorageClassTest{ Name: m.driver.GetDriverInfo().Name, Provisioner: sc.Provisioner, @@ -514,7 +519,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "while deleting") ginkgo.By("Checking CSI driver logs") - err = checkPodLogs(m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1) + err = checkPodLogs(m.driver.GetCalls, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1) framework.ExpectNoError(err) }) } @@ -727,19 +732,19 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) ginkgo.Context("CSI NodeStage error cases [Slow]", func() { - // Global variable in all scripts (called before each test) - globalScript := `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)` trackedCalls := []string{ "NodeStageVolume", "NodeUnstageVolume", } tests := []struct { - name string - expectPodRunning bool - expectedCalls []csiCall - nodeStageScript string - nodeUnstageScript string + name string + expectPodRunning bool + expectedCalls []csiCall + + // Called for each NodeStateVolume calls, with counter incremented atomically before + // the invocation (i.e. first value will be 1). + nodeStageHook func(counter int64) error }{ { // This is already tested elsewhere, adding simple good case here to test the test framework. @@ -749,7 +754,6 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { {expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true}, {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, }, - nodeStageScript: `OK;`, }, { // Kubelet should repeat NodeStage as long as the pod exists @@ -762,7 +766,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, }, // Fail first 3 NodeStage requests, 4th succeeds - nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { INVALIDARGUMENT; } else { OK; }`, + nodeStageHook: func(counter int64) error { + if counter < 4 { + return status.Error(codes.InvalidArgument, "fake error") + } + return nil + }, }, { // Kubelet should repeat NodeStage as long as the pod exists @@ -775,7 +784,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, }, // Fail first 3 NodeStage requests, 4th succeeds - nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { DEADLINEEXCEEDED; } else { OK; }`, + nodeStageHook: func(counter int64) error { + if counter < 4 { + return status.Error(codes.DeadlineExceeded, "fake error") + } + return nil + }, }, { // After NodeUnstage with ephemeral error, the driver may continue staging the volume. @@ -789,7 +803,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { {expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded, deletePod: true}, {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, }, - nodeStageScript: `DEADLINEEXCEEDED;`, + nodeStageHook: func(counter int64) error { + return status.Error(codes.DeadlineExceeded, "fake error") + }, }, { // After NodeUnstage with final error, kubelet can be sure the volume is not staged. @@ -801,21 +817,23 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { // This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff). {expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument, deletePod: true}, }, - nodeStageScript: `INVALIDARGUMENT;`, + // nodeStageScript: `INVALIDARGUMENT;`, + nodeStageHook: func(counter int64) error { + return status.Error(codes.InvalidArgument, "fake error") + }, }, } for _, t := range tests { test := t ginkgo.It(test.name, func() { - scripts := map[string]string{ - "globals": globalScript, - "nodeStageVolumeStart": test.nodeStageScript, - "nodeUnstageVolumeStart": test.nodeUnstageScript, + var hooks *drivers.Hooks + if test.nodeStageHook != nil { + hooks = createPreHook("NodeStageVolume", test.nodeStageHook) } init(testParameters{ - disableAttach: true, - registerDriver: true, - javascriptHooks: scripts, + disableAttach: true, + registerDriver: true, + hooks: hooks, }) defer cleanup() @@ -836,7 +854,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.Failf("timed out waiting for the CSI call that indicates that the pod can be deleted: %v", test.expectedCalls) } time.Sleep(1 * time.Second) - _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName) + _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) framework.ExpectNoError(err, "while waiting for initial CSI calls") if index == 0 { // No CSI call received yet @@ -860,7 +878,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { ginkgo.By("Waiting for all remaining expected CSI calls") err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) { - _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName) + _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) if err != nil { return true, err } @@ -946,11 +964,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } if test.resourceExhausted { - params.javascriptHooks = map[string]string{ - "globals": `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`, - // Every second call returns RESOURCEEXHAUSTED, starting with the first one. - "createVolumeStart": `console.log("Counter:", ++counter); if (counter % 2) { RESOURCEEXHAUSTED; } else { OK; }`, - } + params.hooks = createPreHook("CreateVolume", func(counter int64) error { + if counter%2 != 0 { + return status.Error(codes.ResourceExhausted, "fake error") + } + return nil + }) } init(params) @@ -1006,9 +1025,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { expected = append(expected, normal...) } - var calls []mockCSICall + var calls []drivers.MockCSICall err = wait.PollImmediateUntil(time.Second, func() (done bool, err error) { - c, index, err := compareCSICalls(deterministicCalls, expected, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName) + c, index, err := compareCSICalls(deterministicCalls, expected, m.driver.GetCalls) if err != nil { return true, fmt.Errorf("error waiting for expected CSI calls: %s", err) } @@ -1221,31 +1240,32 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { }) ginkgo.Context("CSI Volume Snapshots [Feature:VolumeSnapshotDataSource]", func() { - // Global variable in all scripts (called before each test) - globalScript := `counter=0; console.log("globals loaded", OK, DEADLINEEXCEEDED)` tests := []struct { - name string - createVolumeScript string - createSnapshotScript string + name string + createSnapshotHook func(counter int64) error }{ { - name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists", - createVolumeScript: `OK`, - createSnapshotScript: `console.log("Counter:", ++counter); if (counter < 8) { DEADLINEEXCEEDED; } else { OK; }`, + name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists", + createSnapshotHook: func(counter int64) error { + if counter < 8 { + return status.Error(codes.DeadlineExceeded, "fake error") + } + return nil + }, }, } for _, test := range tests { + test := test ginkgo.It(test.name, func() { - scripts := map[string]string{ - "globals": globalScript, - "createVolumeStart": test.createVolumeScript, - "createSnapshotStart": test.createSnapshotScript, + var hooks *drivers.Hooks + if test.createSnapshotHook != nil { + hooks = createPreHook("CreateSnapshot", test.createSnapshotHook) } init(testParameters{ - disableAttach: true, - registerDriver: true, - enableSnapshot: true, - javascriptHooks: scripts, + disableAttach: true, + registerDriver: true, + enableSnapshot: true, + hooks: hooks, }) sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver) if !ok { @@ -1256,10 +1276,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { defer cancel() defer cleanup() - var sc *storagev1.StorageClass - if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok { - sc = dDriver.GetDynamicProvisionStorageClass(m.config, "") - } + sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") ginkgo.By("Creating storage class") class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) framework.ExpectNoError(err, "Failed to create class: %v", err) @@ -1402,7 +1419,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { framework.ExpectNoError(err, "while deleting") ginkgo.By("Checking CSI driver logs") - err = checkPodLogs(m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume) + err = checkPodLogs(m.driver.GetCalls, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume) framework.ExpectNoError(err) }) } @@ -1507,33 +1524,30 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { annotations interface{} ) - // Global variable in all scripts (called before each test) - globalScript := `counter=0; console.log("globals loaded", OK, DEADLINEEXCEEDED)` tests := []struct { - name string - createVolumeScript string - createSnapshotScript string + name string + createSnapshotHook func(counter int64) error }{ { // volume snapshot should be created using secrets successfully even if there is a failure in the first few attempts, - name: "volume snapshot create/delete with secrets", - createVolumeScript: `OK`, + name: "volume snapshot create/delete with secrets", // Fail the first 8 calls to create snapshot and succeed the 9th call. - createSnapshotScript: `console.log("Counter:", ++counter); if (counter < 8) { DEADLINEEXCEEDED; } else { OK; }`, + createSnapshotHook: func(counter int64) error { + if counter < 8 { + return status.Error(codes.DeadlineExceeded, "fake error") + } + return nil + }, }, } for _, test := range tests { ginkgo.It(test.name, func() { - scripts := map[string]string{ - "globals": globalScript, - "createVolumeStart": test.createVolumeScript, - "createSnapshotStart": test.createSnapshotScript, - } + hooks := createPreHook("CreateSnapshot", test.createSnapshotHook) init(testParameters{ - disableAttach: true, - registerDriver: true, - enableSnapshot: true, - javascriptHooks: scripts, + disableAttach: true, + registerDriver: true, + enableSnapshot: true, + hooks: hooks, }) sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver) @@ -1895,24 +1909,9 @@ func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.Vol return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{}) } -// Dummy structure that parses just volume_attributes and error code out of logged CSI call -type mockCSICall struct { - json string // full log entry - - Method string - Request struct { - VolumeContext map[string]string `json:"volume_context"` - } - FullError struct { - Code codes.Code `json:"code"` - Message string `json:"message"` - } - Error string -} - // checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes) // has the matching NodeUnpublish -func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string, pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error { +func checkPodLogs(getCalls func() ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error { expectedAttributes := map[string]string{} if expectPodInfo { expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name @@ -1934,10 +1933,11 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai foundAttributes := sets.NewString() numNodePublishVolume := 0 numNodeUnpublishVolume := 0 - calls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName) + calls, err := getCalls() if err != nil { return err } + for _, call := range calls { switch call.Method { case "NodePublishVolume": @@ -1970,39 +1970,6 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai return nil } -func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, error) { - // Load logs of driver pod - log, err := e2epod.GetPodLogs(cs, namespace, driverPodName, driverContainerName) - if err != nil { - return nil, fmt.Errorf("could not load CSI driver logs: %s", err) - } - - logLines := strings.Split(log, "\n") - var calls []mockCSICall - for _, line := range logLines { - index := strings.Index(line, grpcCallPrefix) - if index == -1 { - continue - } - line = line[index+len(grpcCallPrefix):] - call := mockCSICall{ - json: string(line), - } - err := json.Unmarshal([]byte(line), &call) - if err != nil { - framework.Logf("Could not parse CSI driver log line %q: %s", line, err) - continue - } - - // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe" - methodParts := strings.Split(call.Method, "/") - call.Method = methodParts[len(methodParts)-1] - - calls = append(calls, call) - } - return calls, nil -} - // compareCSICalls compares expectedCalls with logs of the mock driver. // It returns index of the first expectedCall that was *not* received // yet or error when calls do not match. @@ -2011,8 +1978,8 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta // // Only permanent errors are returned. Other errors are logged and no // calls are returned. The caller is expected to retry. -func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, int, error) { - allCalls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName) +func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, getCalls func() ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) { + allCalls, err := getCalls() if err != nil { framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err) return nil, 0, nil @@ -2020,8 +1987,8 @@ func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs c // Remove all repeated and ignored calls tracked := sets.NewString(trackedCalls...) - var calls []mockCSICall - var last mockCSICall + var calls []drivers.MockCSICall + var last drivers.MockCSICall for _, c := range allCalls { if !tracked.Has(c.Method) { continue @@ -2145,3 +2112,20 @@ func checkDeleteSnapshotSecrets(cs clientset.Interface, annotations interface{}) return err } + +// createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name). +func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks { + var counter int64 + + return &drivers.Hooks{ + Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) { + return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) { + if strings.Contains(fullMethod, method) { + counter := atomic.AddInt64(&counter, 1) + return nil, callback(counter) + } + return nil, nil + } + }(), + } +} diff --git a/test/e2e/storage/drivers/csi-test/driver/driver-controller.go b/test/e2e/storage/drivers/csi-test/driver/driver-controller.go deleted file mode 100644 index 1d8d2bd771e..00000000000 --- a/test/e2e/storage/drivers/csi-test/driver/driver-controller.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2019 Kubernetes Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package driver - -import ( - "context" - "net" - "sync" - - "google.golang.org/grpc/reflection" - - csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/grpc" -) - -// CSIDriverControllerServer is the Controller service component of the driver. -type CSIDriverControllerServer struct { - Controller csi.ControllerServer - Identity csi.IdentityServer -} - -// CSIDriverController is the CSI Driver Controller backend. -type CSIDriverController struct { - listener net.Listener - server *grpc.Server - controllerServer *CSIDriverControllerServer - wg sync.WaitGroup - running bool - lock sync.Mutex - creds *CSICreds -} - -func NewCSIDriverController(controllerServer *CSIDriverControllerServer) *CSIDriverController { - return &CSIDriverController{ - controllerServer: controllerServer, - } -} - -func (c *CSIDriverController) goServe(started chan<- bool) { - goServe(c.server, &c.wg, c.listener, started) -} - -func (c *CSIDriverController) Address() string { - return c.listener.Addr().String() -} - -func (c *CSIDriverController) Start(l net.Listener) error { - c.lock.Lock() - defer c.lock.Unlock() - - // Set listener. - c.listener = l - - // Create a new grpc server. - c.server = grpc.NewServer( - grpc.UnaryInterceptor(c.callInterceptor), - ) - - if c.controllerServer.Controller != nil { - csi.RegisterControllerServer(c.server, c.controllerServer.Controller) - } - if c.controllerServer.Identity != nil { - csi.RegisterIdentityServer(c.server, c.controllerServer.Identity) - } - - reflection.Register(c.server) - - waitForServer := make(chan bool) - c.goServe(waitForServer) - <-waitForServer - c.running = true - return nil -} - -func (c *CSIDriverController) Stop() { - stop(&c.lock, &c.wg, c.server, c.running) -} - -func (c *CSIDriverController) Close() { - c.server.Stop() -} - -func (c *CSIDriverController) IsRunning() bool { - c.lock.Lock() - defer c.lock.Unlock() - - return c.running -} - -func (c *CSIDriverController) SetDefaultCreds() { - setDefaultCreds(c.creds) -} - -func (c *CSIDriverController) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return callInterceptor(ctx, c.creds, req, info, handler) -} diff --git a/test/e2e/storage/drivers/csi-test/driver/driver-node.go b/test/e2e/storage/drivers/csi-test/driver/driver-node.go deleted file mode 100644 index 7720bfc493a..00000000000 --- a/test/e2e/storage/drivers/csi-test/driver/driver-node.go +++ /dev/null @@ -1,109 +0,0 @@ -/* -Copyright 2019 Kubernetes Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package driver - -import ( - context "context" - "net" - "sync" - - csi "github.com/container-storage-interface/spec/lib/go/csi" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" -) - -// CSIDriverNodeServer is the Node service component of the driver. -type CSIDriverNodeServer struct { - Node csi.NodeServer - Identity csi.IdentityServer -} - -// CSIDriverNode is the CSI Driver Node backend. -type CSIDriverNode struct { - listener net.Listener - server *grpc.Server - nodeServer *CSIDriverNodeServer - wg sync.WaitGroup - running bool - lock sync.Mutex - creds *CSICreds -} - -func NewCSIDriverNode(nodeServer *CSIDriverNodeServer) *CSIDriverNode { - return &CSIDriverNode{ - nodeServer: nodeServer, - } -} - -func (c *CSIDriverNode) goServe(started chan<- bool) { - goServe(c.server, &c.wg, c.listener, started) -} - -func (c *CSIDriverNode) Address() string { - return c.listener.Addr().String() -} - -func (c *CSIDriverNode) Start(l net.Listener) error { - c.lock.Lock() - defer c.lock.Unlock() - - // Set listener. - c.listener = l - - // Create a new grpc server. - c.server = grpc.NewServer( - grpc.UnaryInterceptor(c.callInterceptor), - ) - - if c.nodeServer.Node != nil { - csi.RegisterNodeServer(c.server, c.nodeServer.Node) - } - if c.nodeServer.Identity != nil { - csi.RegisterIdentityServer(c.server, c.nodeServer.Identity) - } - - reflection.Register(c.server) - - waitForServer := make(chan bool) - c.goServe(waitForServer) - <-waitForServer - c.running = true - return nil -} - -func (c *CSIDriverNode) Stop() { - stop(&c.lock, &c.wg, c.server, c.running) -} - -func (c *CSIDriverNode) Close() { - c.server.Stop() -} - -func (c *CSIDriverNode) IsRunning() bool { - c.lock.Lock() - defer c.lock.Unlock() - - return c.running -} - -func (c *CSIDriverNode) SetDefaultCreds() { - setDefaultCreds(c.creds) -} - -func (c *CSIDriverNode) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return callInterceptor(ctx, c.creds, req, info, handler) -} diff --git a/test/e2e/storage/drivers/csi-test/driver/driver.go b/test/e2e/storage/drivers/csi-test/driver/driver.go index 0a61ae7c48e..ceddd7174ec 100644 --- a/test/e2e/storage/drivers/csi-test/driver/driver.go +++ b/test/e2e/storage/drivers/csi-test/driver/driver.go @@ -27,11 +27,10 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/klog" + "k8s.io/klog/v2" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" ) var ( @@ -75,8 +74,11 @@ type CSIDriver struct { running bool lock sync.Mutex creds *CSICreds + logGRPC LogGRPC } +type LogGRPC func(method string, request, reply interface{}, err error) + func NewCSIDriver(servers *CSIDriverServers) *CSIDriver { return &CSIDriver{ servers: servers, @@ -90,7 +92,12 @@ func (c *CSIDriver) goServe(started chan<- bool) { func (c *CSIDriver) Address() string { return c.listener.Addr().String() } -func (c *CSIDriver) Start(l net.Listener) error { + +// Start runs a gRPC server with all enabled services. If an interceptor +// is give, then it will be used. Otherwise, an interceptor which +// handles simple credential checks and logs gRPC calls in JSON format +// will be used. +func (c *CSIDriver) Start(l net.Listener, interceptor grpc.UnaryServerInterceptor) error { c.lock.Lock() defer c.lock.Unlock() @@ -98,9 +105,10 @@ func (c *CSIDriver) Start(l net.Listener) error { c.listener = l // Create a new grpc server - c.server = grpc.NewServer( - grpc.UnaryInterceptor(c.callInterceptor), - ) + if interceptor == nil { + interceptor = c.callInterceptor + } + c.server = grpc.NewServer(grpc.UnaryInterceptor(interceptor)) // Register Mock servers if c.servers.Controller != nil { @@ -112,7 +120,6 @@ func (c *CSIDriver) Start(l net.Listener) error { if c.servers.Node != nil { csi.RegisterNodeServer(c.server, c.servers.Node) } - reflection.Register(c.server) // Start listening for requests waitForServer := make(chan bool) @@ -142,10 +149,6 @@ func (c *CSIDriver) SetDefaultCreds() { setDefaultCreds(c.creds) } -func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return callInterceptor(ctx, c.creds, req, info, handler) -} - // goServe starts a grpc server. func goServe(server *grpc.Server, wg *sync.WaitGroup, listener net.Listener, started chan<- bool) { wg.Add(1) @@ -187,14 +190,17 @@ func setDefaultCreds(creds *CSICreds) { } } -func callInterceptor(ctx context.Context, creds *CSICreds, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - err := authInterceptor(creds, req) +func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + err := authInterceptor(c.creds, req) if err != nil { logGRPC(info.FullMethod, req, nil, err) return nil, err } rsp, err := handler(ctx, req) logGRPC(info.FullMethod, req, rsp, err) + if c.logGRPC != nil { + c.logGRPC(info.FullMethod, req, rsp, err) + } return rsp, err } diff --git a/test/e2e/storage/drivers/csi-test/driver/mock.go b/test/e2e/storage/drivers/csi-test/driver/mock.go index 6b8a08c26f7..c6560f99ed3 100644 --- a/test/e2e/storage/drivers/csi-test/driver/mock.go +++ b/test/e2e/storage/drivers/csi-test/driver/mock.go @@ -19,7 +19,6 @@ package driver import ( "net" - "github.com/kubernetes-csi/csi-test/v4/utils" "google.golang.org/grpc" ) @@ -31,10 +30,11 @@ type MockCSIDriverServers struct { type MockCSIDriver struct { CSIDriver - conn *grpc.ClientConn + conn *grpc.ClientConn + interceptor grpc.UnaryServerInterceptor } -func NewMockCSIDriver(servers *MockCSIDriverServers) *MockCSIDriver { +func NewMockCSIDriver(servers *MockCSIDriverServers, interceptor grpc.UnaryServerInterceptor) *MockCSIDriver { return &MockCSIDriver{ CSIDriver: CSIDriver{ servers: &CSIDriverServers{ @@ -43,6 +43,7 @@ func NewMockCSIDriver(servers *MockCSIDriverServers) *MockCSIDriver { Identity: servers.Identity, }, }, + interceptor: interceptor, } } @@ -53,7 +54,7 @@ func (m *MockCSIDriver) StartOnAddress(network, address string) error { return err } - if err := m.CSIDriver.Start(l); err != nil { + if err := m.CSIDriver.Start(l, m.interceptor); err != nil { l.Close() return err } @@ -67,22 +68,6 @@ func (m *MockCSIDriver) Start() error { return m.StartOnAddress("tcp", "127.0.0.1:0") } -func (m *MockCSIDriver) Nexus() (*grpc.ClientConn, error) { - // Start server - err := m.Start() - if err != nil { - return nil, err - } - - // Create a client connection - m.conn, err = utils.Connect(m.Address(), grpc.WithInsecure()) - if err != nil { - return nil, err - } - - return m.conn, nil -} - func (m *MockCSIDriver) Close() { m.conn.Close() m.server.Stop() diff --git a/test/e2e/storage/drivers/csi-test/mock/service/controller.go b/test/e2e/storage/drivers/csi-test/mock/service/controller.go index fb38d8d02e9..7e8c1abad1f 100644 --- a/test/e2e/storage/drivers/csi-test/mock/service/controller.go +++ b/test/e2e/storage/drivers/csi-test/mock/service/controller.go @@ -46,9 +46,7 @@ func (s *service) CreateVolume( if req.VolumeCapabilities == nil { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") } - if hookVal, hookMsg := s.execHook("CreateVolumeStart"); hookVal != codes.OK { - return nil, status.Errorf(hookVal, hookMsg) - } + // Check to see if the volume already exists. if i, v := s.findVolByName(ctx, req.Name); i >= 0 { // Requested volume name already exists, need to check if the existing volume's @@ -610,10 +608,6 @@ func (s *service) CreateSnapshot(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "Snapshot SourceVolumeId cannot be empty") } - if hookVal, hookMsg := s.execHook("CreateSnapshotStart"); hookVal != codes.OK { - return nil, status.Errorf(hookVal, hookMsg) - } - // Check to see if the snapshot already exists. if i, v := s.snapshots.FindSnapshot("name", req.GetName()); i >= 0 { // Requested snapshot name already exists diff --git a/test/e2e/storage/drivers/csi-test/mock/service/hooks-const.go b/test/e2e/storage/drivers/csi-test/mock/service/hooks-const.go deleted file mode 100644 index 46eed6af7ca..00000000000 --- a/test/e2e/storage/drivers/csi-test/mock/service/hooks-const.go +++ /dev/null @@ -1,24 +0,0 @@ -package service - -// Predefinded constants for the JavaScript hooks, they must correspond to the -// error codes used by gRPC, see: -// https://github.com/grpc/grpc-go/blob/master/codes/codes.go -const ( - grpcJSCodes string = `OK = 0; - CANCELED = 1; - UNKNOWN = 2; - INVALIDARGUMENT = 3; - DEADLINEEXCEEDED = 4; - NOTFOUND = 5; - ALREADYEXISTS = 6; - PERMISSIONDENIED = 7; - RESOURCEEXHAUSTED = 8; - FAILEDPRECONDITION = 9; - ABORTED = 10; - OUTOFRANGE = 11; - UNIMPLEMENTED = 12; - INTERNAL = 13; - UNAVAILABLE = 14; - DATALOSS = 15; - UNAUTHENTICATED = 16` -) diff --git a/test/e2e/storage/drivers/csi-test/mock/service/node.go b/test/e2e/storage/drivers/csi-test/mock/service/node.go index e79d2561a34..ddffea048af 100644 --- a/test/e2e/storage/drivers/csi-test/mock/service/node.go +++ b/test/e2e/storage/drivers/csi-test/mock/service/node.go @@ -18,7 +18,6 @@ package service import ( "fmt" - "os" "path" "strconv" @@ -35,10 +34,6 @@ func (s *service) NodeStageVolume( req *csi.NodeStageVolumeRequest) ( *csi.NodeStageVolumeResponse, error) { - if hookVal, hookMsg := s.execHook("NodeStageVolumeStart"); hookVal != codes.OK { - return nil, status.Errorf(hookVal, hookMsg) - } - device, ok := req.PublishContext["device"] if !ok { if s.config.DisableAttach { @@ -62,7 +57,7 @@ func (s *service) NodeStageVolume( return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot be empty") } - exists, err := checkTargetExists(req.StagingTargetPath) + exists, err := s.config.IO.DirExists(req.StagingTargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -113,10 +108,6 @@ func (s *service) NodeUnstageVolume( return nil, status.Error(codes.InvalidArgument, "Staging Target Path cannot be empty") } - if hookVal, hookMsg := s.execHook("NodeUnstageVolumeStart"); hookVal != codes.OK { - return nil, status.Errorf(hookVal, hookMsg) - } - s.volsRWL.Lock() defer s.volsRWL.Unlock() @@ -178,7 +169,7 @@ func (s *service) NodePublishVolume( // May happen with old (or, at this time, even the current) Kubernetes // although it shouldn't (https://github.com/kubernetes/kubernetes/issues/75535). - exists, err := checkTargetExists(req.TargetPath) + exists, err := s.config.IO.DirExists(req.TargetPath) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -220,13 +211,13 @@ func (s *service) NodePublishVolume( } } else { if req.GetTargetPath() != "" { - exists, err := checkTargetExists(req.GetTargetPath()) + exists, err := s.config.IO.DirExists(req.GetTargetPath()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } if !exists { // If target path does not exist we need to create the directory where volume will be staged - if err = os.Mkdir(req.TargetPath, os.FileMode(0755)); err != nil { + if err = s.config.IO.Mkdir(req.TargetPath); err != nil { msg := fmt.Sprintf("NodePublishVolume: could not create target dir %q: %v", req.TargetPath, err) return nil, status.Error(codes.Internal, msg) } @@ -281,7 +272,7 @@ func (s *service) NodeUnpublishVolume( } // Delete any created paths - err := os.RemoveAll(v.VolumeContext[nodeMntPathKey]) + err := s.config.IO.RemoveAll(v.VolumeContext[nodeMntPathKey]) if err != nil { return nil, status.Errorf(codes.Internal, "Unable to delete previously created target directory") } @@ -415,10 +406,6 @@ func (s *service) NodeGetInfo(ctx context.Context, func (s *service) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - if hookVal, hookMsg := s.execHook("NodeGetVolumeStatsStart"); hookVal != codes.OK { - return nil, status.Errorf(hookVal, hookMsg) - } - resp := &csi.NodeGetVolumeStatsResponse{ VolumeCondition: &csi.VolumeCondition{}, } @@ -461,16 +448,3 @@ func (s *service) NodeGetVolumeStats(ctx context.Context, return resp, nil } - -// checkTargetExists checks if a given path exists. -func checkTargetExists(targetPath string) (bool, error) { - _, err := os.Stat(targetPath) - switch { - case err == nil: - return true, nil - case os.IsNotExist(err): - return false, nil - default: - return false, err - } -} diff --git a/test/e2e/storage/drivers/csi-test/mock/service/service.go b/test/e2e/storage/drivers/csi-test/mock/service/service.go index 246bb374364..93edbf300ed 100644 --- a/test/e2e/storage/drivers/csi-test/mock/service/service.go +++ b/test/e2e/storage/drivers/csi-test/mock/service/service.go @@ -18,21 +18,17 @@ package service import ( "fmt" - "reflect" + "os" "strings" "sync" "sync/atomic" - "k8s.io/klog" - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-csi/csi-test/v4/mock/cache" "golang.org/x/net/context" "google.golang.org/grpc/codes" + "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/cache" "github.com/golang/protobuf/ptypes" - - "github.com/robertkrimen/otto" ) const ( @@ -51,47 +47,7 @@ const ( // Manifest is the SP's manifest. var Manifest = map[string]string{ - "url": "https://github.com/kubernetes-csi/csi-test/mock", -} - -// JavaScript hooks to be run to perform various tests -type Hooks struct { - Globals string `yaml:"globals"` // will be executed once before all other scripts - CreateVolumeStart string `yaml:"createVolumeStart"` - CreateVolumeEnd string `yaml:"createVolumeEnd"` - DeleteVolumeStart string `yaml:"deleteVolumeStart"` - DeleteVolumeEnd string `yaml:"deleteVolumeEnd"` - ControllerPublishVolumeStart string `yaml:"controllerPublishVolumeStart"` - ControllerPublishVolumeEnd string `yaml:"controllerPublishVolumeEnd"` - ControllerUnpublishVolumeStart string `yaml:"controllerUnpublishVolumeStart"` - ControllerUnpublishVolumeEnd string `yaml:"controllerUnpublishVolumeEnd"` - ValidateVolumeCapabilities string `yaml:"validateVolumeCapabilities"` - ListVolumesStart string `yaml:"listVolumesStart"` - ListVolumesEnd string `yaml:"listVolumesEnd"` - GetCapacity string `yaml:"getCapacity"` - ControllerGetCapabilitiesStart string `yaml:"controllerGetCapabilitiesStart"` - ControllerGetCapabilitiesEnd string `yaml:"controllerGetCapabilitiesEnd"` - CreateSnapshotStart string `yaml:"createSnapshotStart"` - CreateSnapshotEnd string `yaml:"createSnapshotEnd"` - DeleteSnapshotStart string `yaml:"deleteSnapshotStart"` - DeleteSnapshotEnd string `yaml:"deleteSnapshotEnd"` - ListSnapshots string `yaml:"listSnapshots"` - ControllerExpandVolumeStart string `yaml:"controllerExpandVolumeStart"` - ControllerExpandVolumeEnd string `yaml:"controllerExpandVolumeEnd"` - NodeStageVolumeStart string `yaml:"nodeStageVolumeStart"` - NodeStageVolumeEnd string `yaml:"nodeStageVolumeEnd"` - NodeUnstageVolumeStart string `yaml:"nodeUnstageVolumeStart"` - NodeUnstageVolumeEnd string `yaml:"nodeUnstageVolumeEnd"` - NodePublishVolumeStart string `yaml:"nodePublishVolumeStart"` - NodePublishVolumeEnd string `yaml:"nodePublishVolumeEnd"` - NodeUnpublishVolumeStart string `yaml:"nodeUnpublishVolumeStart"` - NodeUnpublishVolumeEnd string `yaml:"nodeUnpublishVolumeEnd"` - NodeExpandVolumeStart string `yaml:"nodeExpandVolumeStart"` - NodeExpandVolumeEnd string `yaml:"nodeExpandVolumeEnd"` - NodeGetCapabilities string `yaml:"nodeGetCapabilities"` - NodeGetInfo string `yaml:"nodeGetInfo"` - NodeGetVolumeStatsStart string `yaml:"nodeGetVolumeStatsStart"` - NodeGetVolumeStatsEnd string `yaml:"nodeGetVolumeStatsEnd"` + "url": "https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock", } type Config struct { @@ -103,7 +59,41 @@ type Config struct { DisableOnlineExpansion bool PermissiveTargetPath bool EnableTopology bool - ExecHooks *Hooks + IO DirIO +} + +// DirIO is an abstraction over direct os calls. +type DirIO interface { + // DirExists returns false if the path doesn't exist, true if it exists and is a directory, an error otherwise. + DirExists(path string) (bool, error) + // Mkdir creates the directory, but not its parents, with 0755 permissions. + Mkdir(path string) error + // RemoveAll removes the path and everything contained inside it. It's not an error if the path does not exist. + RemoveAll(path string) error +} + +type OSDirIO struct{} + +func (o OSDirIO) DirExists(path string) (bool, error) { + info, err := os.Stat(path) + switch { + case err == nil && !info.IsDir(): + return false, fmt.Errorf("%s: not a directory", path) + case err == nil: + return true, nil + case os.IsNotExist(err): + return false, nil + default: + return false, err + } +} + +func (o OSDirIO) Mkdir(path string) error { + return os.Mkdir(path, os.FileMode(0755)) +} + +func (o OSDirIO) RemoveAll(path string) error { + return os.RemoveAll(path) } // Service is the CSI Mock service provider. @@ -122,7 +112,6 @@ type service struct { snapshots cache.SnapshotCache snapshotsNID uint64 config Config - hooksVm *otto.Otto } type Volume struct { @@ -144,13 +133,8 @@ func New(config Config) Service { nodeID: config.DriverName, config: config, } - if config.ExecHooks != nil { - s.hooksVm = otto.New() - s.hooksVm.Run(grpcJSCodes) // set global variables with gRPC error codes - _, err := s.hooksVm.Run(s.config.ExecHooks.Globals) - if err != nil { - klog.Exitf("Error encountered in the global exec hook: %v. Exiting\n", err) - } + if s.config.IO == nil { + s.config.IO = OSDirIO{} } s.snapshots = cache.NewSnapshotCache() s.vols = []csi.Volume{ @@ -288,22 +272,5 @@ func (s *service) getAttachCount(devPathKey string) int64 { } func (s *service) execHook(hookName string) (codes.Code, string) { - if s.hooksVm != nil { - script := reflect.ValueOf(*s.config.ExecHooks).FieldByName(hookName).String() - if len(script) > 0 { - result, err := s.hooksVm.Run(script) - if err != nil { - klog.Exitf("Exec hook %s error: %v; exiting\n", hookName, err) - } - rv, err := result.ToInteger() - if err == nil { - // Function returned an integer, use it - return codes.Code(rv), fmt.Sprintf("Exec hook %s returned non-OK code", hookName) - } else { - // Function returned non-integer data type, discard it - return codes.OK, "" - } - } - } return codes.OK, "" } diff --git a/test/e2e/storage/drivers/csi.go b/test/e2e/storage/drivers/csi.go index f6c2c253611..17c61ababa1 100644 --- a/test/e2e/storage/drivers/csi.go +++ b/test/e2e/storage/drivers/csi.go @@ -37,13 +37,16 @@ package drivers import ( "context" + "encoding/json" + "errors" "fmt" "strconv" + "strings" + "sync" "time" - "gopkg.in/yaml.v2" - "github.com/onsi/ginkgo" + "google.golang.org/grpc/codes" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -51,14 +54,20 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" + mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver" + mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service" + "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" storageframework "k8s.io/kubernetes/test/e2e/storage/framework" "k8s.io/kubernetes/test/e2e/storage/utils" + + "google.golang.org/grpc" ) const ( @@ -232,10 +241,42 @@ type mockCSIDriver struct { attachLimit int enableTopology bool enableNodeExpansion bool - javascriptHooks map[string]string + hooks Hooks tokenRequests []storagev1.TokenRequest requiresRepublish *bool fsGroupPolicy *storagev1.FSGroupPolicy + embedded bool + calls MockCSICalls + embeddedCSIDriver *mockdriver.CSIDriver + + // Additional values set during PrepareTest + clientSet kubernetes.Interface + driverNamespace *v1.Namespace +} + +// Hooks to be run to execute while handling gRPC calls. +// +// At the moment, only generic pre- and post-function call +// hooks are implemented. Those hooks can cast the request and +// response values if needed. More hooks inside specific +// functions could be added if needed. +type Hooks struct { + // Pre is called before invoking the mock driver's implementation of a method. + // If either a non-nil reply or error are returned, then those are returned to the caller. + Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error) + + // Post is called after invoking the mock driver's implementation of a method. + // What it returns is used as actual result. + Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error) +} + +// MockCSITestDriver provides additional functions specific to the CSI mock driver. +type MockCSITestDriver interface { + storageframework.DynamicPVTestDriver + + // GetCalls returns all currently observed gRPC calls. Only valid + // after PrepareTest. + GetCalls() ([]MockCSICall, error) } // CSIMockDriverOpts defines options used for csi driver @@ -249,10 +290,94 @@ type CSIMockDriverOpts struct { EnableResizing bool EnableNodeExpansion bool EnableSnapshot bool - JavascriptHooks map[string]string TokenRequests []storagev1.TokenRequest RequiresRepublish *bool FSGroupPolicy *storagev1.FSGroupPolicy + + // Embedded defines whether the CSI mock driver runs + // inside the cluster (false, the default) or just a proxy + // runs inside the cluster and all gRPC calls are handled + // inside the e2e.test binary. + Embedded bool + + // Hooks that will be called if (and only if!) the embedded + // mock driver is used. Beware that hooks are invoked + // asynchronously in different goroutines. + Hooks Hooks +} + +// Dummy structure that parses just volume_attributes and error code out of logged CSI call +type MockCSICall struct { + json string // full log entry + + Method string + Request struct { + VolumeContext map[string]string `json:"volume_context"` + } + FullError struct { + Code codes.Code `json:"code"` + Message string `json:"message"` + } + Error string +} + +// MockCSICalls is a Thread-safe storage for MockCSICall instances. +type MockCSICalls struct { + calls []MockCSICall + mutex sync.Mutex +} + +// Get returns all currently recorded calls. +func (c *MockCSICalls) Get() []MockCSICall { + c.mutex.Lock() + defer c.mutex.Unlock() + + return c.calls[:] +} + +// Add appens one new call at the end. +func (c *MockCSICalls) Add(call MockCSICall) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.calls = append(c.calls, call) +} + +// LogGRPC takes individual parameters from the mock CSI driver and adds them. +func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) { + // Encoding to JSON and decoding mirrors the traditional way of capturing calls. + // Probably could be simplified now... + logMessage := struct { + Method string + Request interface{} + Response interface{} + // Error as string, for backward compatibility. + // "" on no error. + Error string + // Full error dump, to be able to parse out full gRPC error code and message separately in a test. + FullError error + }{ + Method: method, + Request: request, + Response: reply, + FullError: err, + } + + if err != nil { + logMessage.Error = err.Error() + } + + msg, _ := json.Marshal(logMessage) + call := MockCSICall{ + json: string(msg), + } + json.Unmarshal(msg, &call) + + // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe" + methodParts := strings.Split(call.Method, "/") + call.Method = methodParts[len(methodParts)-1] + + c.Add(call) } var _ storageframework.TestDriver = &mockCSIDriver{} @@ -260,7 +385,7 @@ var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{} var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{} // InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface -func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver { +func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver { driverManifests := []string{ "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml", "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml", @@ -268,7 +393,11 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver "test/e2e/testing-manifests/storage-csi/external-snapshotter/rbac.yaml", "test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml", "test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml", - "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml", + } + if driverOpts.Embedded { + driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml") + } else { + driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml") } if driverOpts.RegisterDriver { @@ -309,10 +438,11 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver attachable: !driverOpts.DisableAttach, attachLimit: driverOpts.AttachLimit, enableNodeExpansion: driverOpts.EnableNodeExpansion, - javascriptHooks: driverOpts.JavascriptHooks, tokenRequests: driverOpts.TokenRequests, requiresRepublish: driverOpts.RequiresRepublish, fsGroupPolicy: driverOpts.FSGroupPolicy, + embedded: driverOpts.Embedded, + hooks: driverOpts.Hooks, } } @@ -340,62 +470,108 @@ func (m *mockCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, } func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) { + m.clientSet = f.ClientSet + // Create secondary namespace which will be used for creating driver - driverNamespace := utils.CreateDriverNamespace(f) - driverns := driverNamespace.Name + m.driverNamespace = utils.CreateDriverNamespace(f) + driverns := m.driverNamespace.Name testns := f.Namespace.Name - ginkgo.By("deploying csi mock driver") - cancelLogging := utils.StartPodLogs(f, driverNamespace) + if m.embedded { + ginkgo.By("deploying csi mock proxy") + } else { + ginkgo.By("deploying csi mock driver") + } + cancelLogging := utils.StartPodLogs(f, m.driverNamespace) cs := f.ClientSet // pods should be scheduled on the node node, err := e2enode.GetRandomReadySchedulableNode(cs) framework.ExpectNoError(err) + + embeddedCleanup := func() {} + containerArgs := []string{} + if m.embedded { + // Run embedded CSI driver. + // + // For now we start exactly one instance which implements controller, + // node and identity services. It matches with the one pod that we run + // inside the cluster. The name and namespace of that one is deterministic, + // so we know what to connect to. + // + // Long-term we could also deploy one central controller and multiple + // node instances, with knowledge about provisioned volumes shared in + // this process. + podname := "csi-mockplugin-0" + containername := "mock" + ctx, cancel := context.WithCancel(context.Background()) + serviceConfig := mockservice.Config{ + DisableAttach: !m.attachable, + DriverName: "csi-mock-" + f.UniqueName, + AttachLimit: int64(m.attachLimit), + NodeExpansionRequired: m.enableNodeExpansion, + EnableTopology: m.enableTopology, + IO: proxy.PodDirIO{ + F: f, + Namespace: m.driverNamespace.Name, + PodName: podname, + ContainerName: "busybox", + }, + } + s := mockservice.New(serviceConfig) + servers := &mockdriver.CSIDriverServers{ + Controller: s, + Identity: s, + Node: s, + } + m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers) + + l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), + proxy.Addr{ + Namespace: m.driverNamespace.Name, + PodName: podname, + ContainerName: containername, + Port: 9000, + }, + ) + framework.ExpectNoError(err, "start connecting to proxy pod") + err = m.embeddedCSIDriver.Start(l, m.interceptGRPC) + framework.ExpectNoError(err, "start mock driver") + + embeddedCleanup = func() { + // Kill all goroutines and delete resources of the mock driver. + m.embeddedCSIDriver.Stop() + l.Close() + cancel() + } + } else { + // When using the mock driver inside the cluster it has to be reconfigured + // via command line parameters. + containerArgs = append(containerArgs, "--name=csi-mock-"+f.UniqueName) + + if !m.attachable { + containerArgs = append(containerArgs, "--disable-attach") + } + + if m.enableTopology { + containerArgs = append(containerArgs, "--enable-topology") + } + + if m.attachLimit > 0 { + containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit)) + } + + if m.enableNodeExpansion { + containerArgs = append(containerArgs, "--node-expand-required=true") + } + } + config := &storageframework.PerTestConfig{ Driver: m, Prefix: "mock", Framework: f, ClientNodeSelection: e2epod.NodeSelection{Name: node.Name}, - DriverNamespace: driverNamespace, - } - - containerArgs := []string{"--name=csi-mock-" + f.UniqueName} - if !m.attachable { - containerArgs = append(containerArgs, "--disable-attach") - } - - if m.enableTopology { - containerArgs = append(containerArgs, "--enable-topology") - } - - if m.attachLimit > 0 { - containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit)) - } - - if m.enableNodeExpansion { - containerArgs = append(containerArgs, "--node-expand-required=true") - } - - // Create a config map with javascript hooks. Create it even when javascriptHooks - // are empty, so we can unconditionally add it to the mock pod. - const hooksConfigMapName = "mock-driver-hooks" - hooksYaml, err := yaml.Marshal(m.javascriptHooks) - framework.ExpectNoError(err) - hooks := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: hooksConfigMapName, - }, - Data: map[string]string{ - "hooks.yaml": string(hooksYaml), - }, - } - - _, err = f.ClientSet.CoreV1().ConfigMaps(driverns).Create(context.TODO(), hooks, metav1.CreateOptions{}) - framework.ExpectNoError(err) - - if len(m.javascriptHooks) > 0 { - containerArgs = append(containerArgs, "--hooks-file=/etc/hooks/hooks.yaml") + DriverNamespace: m.driverNamespace, } o := utils.PatchCSIOptions{ @@ -416,7 +592,7 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P RequiresRepublish: m.requiresRepublish, FSGroupPolicy: m.fsGroupPolicy, } - cleanup, err := utils.CreateFromManifests(f, driverNamespace, func(item interface{}) error { + cleanup, err := utils.CreateFromManifests(f, m.driverNamespace, func(item interface{}) error { return utils.PatchCSIDeployment(f, o, item) }, m.manifests...) @@ -424,7 +600,7 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P framework.Failf("deploying csi mock driver: %v", err) } - cleanupFunc := generateDriverCleanupFunc( + driverCleanupFunc := generateDriverCleanupFunc( f, "mock", testns, @@ -432,9 +608,83 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P cleanup, cancelLogging) + cleanupFunc := func() { + embeddedCleanup() + driverCleanupFunc() + } + return config, cleanupFunc } +func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + // Always log the call and its final result, + // regardless whether the result was from the real + // implementation or a hook. + m.calls.LogGRPC(info.FullMethod, req, resp, err) + }() + + if m.hooks.Pre != nil { + resp, err = m.hooks.Pre(ctx, info.FullMethod, req) + if resp != nil || err != nil { + return + } + } + resp, err = handler(ctx, req) + if m.hooks.Post != nil { + resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err) + } + return +} + +func (m *mockCSIDriver) GetCalls() ([]MockCSICall, error) { + if m.embedded { + return m.calls.Get(), nil + } + + if m.driverNamespace == nil { + return nil, errors.New("PrepareTest not called yet") + } + + // Name of CSI driver pod name (it's in a StatefulSet with a stable name) + driverPodName := "csi-mockplugin-0" + // Name of CSI driver container name + driverContainerName := "mock" + // Prefix of the mock driver grpc log + grpcCallPrefix := "gRPCCall:" + + // Load logs of driver pod + log, err := e2epod.GetPodLogs(m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName) + if err != nil { + return nil, fmt.Errorf("could not load CSI driver logs: %s", err) + } + + logLines := strings.Split(log, "\n") + var calls []MockCSICall + for _, line := range logLines { + index := strings.Index(line, grpcCallPrefix) + if index == -1 { + continue + } + line = line[index+len(grpcCallPrefix):] + call := MockCSICall{ + json: string(line), + } + err := json.Unmarshal([]byte(line), &call) + if err != nil { + framework.Logf("Could not parse CSI driver log line %q: %s", line, err) + continue + } + + // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe" + methodParts := strings.Split(call.Method, "/") + call.Method = methodParts[len(methodParts)-1] + + calls = append(calls, call) + } + return calls, nil +} + // gce-pd type gcePDCSIDriver struct { driverInfo storageframework.DriverInfo diff --git a/test/e2e/storage/drivers/proxy/io.go b/test/e2e/storage/drivers/proxy/io.go new file mode 100644 index 00000000000..74fb6e1fa48 --- /dev/null +++ b/test/e2e/storage/drivers/proxy/io.go @@ -0,0 +1,82 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "fmt" + + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service" +) + +type PodDirIO struct { + F *framework.Framework + Namespace string + PodName string + ContainerName string +} + +var _ service.DirIO = PodDirIO{} + +func (p PodDirIO) DirExists(path string) (bool, error) { + stdout, stderr, err := p.execute([]string{ + "sh", + "-c", + fmt.Sprintf("if ! [ -e '%s' ]; then echo notexist; elif [ -d '%s' ]; then echo dir; else echo nodir; fi", path, path), + }) + if err != nil { + return false, fmt.Errorf("error executing dir test commands: stderr=%q, %v", stderr, err) + } + switch stdout { + case "notexist": + return false, nil + case "nodir": + return false, fmt.Errorf("%s: not a directory", path) + case "dir": + return true, nil + default: + return false, fmt.Errorf("unexpected output from dir test commands: %q", stdout) + } +} + +func (p PodDirIO) Mkdir(path string) error { + _, stderr, err := p.execute([]string{"mkdir", path}) + if err != nil { + return fmt.Errorf("mkdir %q: stderr=%q, %v", path, stderr, err) + } + return nil +} + +func (p PodDirIO) RemoveAll(path string) error { + _, stderr, err := p.execute([]string{"rm", "-rf", path}) + if err != nil { + return fmt.Errorf("rm -rf %q: stderr=%q, %v", path, stderr, err) + } + return nil +} + +func (p PodDirIO) execute(command []string) (string, string, error) { + return p.F.ExecWithOptions(framework.ExecOptions{ + Command: command, + Namespace: p.Namespace, + PodName: p.PodName, + ContainerName: p.ContainerName, + CaptureStdout: true, + CaptureStderr: true, + Quiet: true, + }) +} diff --git a/test/e2e/storage/drivers/proxy/portproxy.go b/test/e2e/storage/drivers/proxy/portproxy.go new file mode 100644 index 00000000000..9050cac075e --- /dev/null +++ b/test/e2e/storage/drivers/proxy/portproxy.go @@ -0,0 +1,344 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "sync" + "sync/atomic" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "k8s.io/klog/v2" +) + +// Maximum number of forwarded connections. In practice we don't +// need more than one per sidecar and kubelet. Keeping this reasonably +// small ensures that we don't establish connections through the apiserver +// and the remote kernel which then arent' needed. +const maxConcurrentConnections = 10 + +// Listen creates a listener which returns new connections whenever someone connects +// to a socat or mock driver proxy instance running inside the given pod. +// +// socat must by started with ",fork TCP-LISTEN:,reuseport" +// for this to work. "" can be anything that accepts connections, +// for example "UNIX-LISTEN:/csi/csi.sock". In this mode, socat will +// accept exactly one connection on the given port for each connection +// that socat itself accepted. +// +// Listening stops when the context is done or Close() is called. +func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *rest.Config, addr Addr) (net.Listener, error) { + // We connect through port forwarding. Strictly + // speaking this is overkill because we don't need a local + // port. But this way we can reuse client-go/tools/portforward + // instead of having to replicate handleConnection + // in our own code. + restClient := clientset.CoreV1().RESTClient() + if restConfig.GroupVersion == nil { + restConfig.GroupVersion = &schema.GroupVersion{} + } + if restConfig.NegotiatedSerializer == nil { + restConfig.NegotiatedSerializer = scheme.Codecs + } + + // The setup code around the actual portforward is from + // https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go + req := restClient.Post(). + Resource("pods"). + Namespace(addr.Namespace). + Name(addr.PodName). + SubResource("portforward") + transport, upgrader, err := spdy.RoundTripperFor(restConfig) + if err != nil { + return nil, fmt.Errorf("create round tripper: %v", err) + } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL()) + + prefix := fmt.Sprintf("port forwarding for %s", addr) + ctx, cancel := context.WithCancel(ctx) + l := &listener{ + connections: make(chan *connection), + ctx: ctx, + cancel: cancel, + addr: addr, + } + + // Port forwarding is allowed to fail and will be restarted when it does. + prepareForwarding := func() (*portforward.PortForwarder, error) { + pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + for i, status := range pod.Status.ContainerStatuses { + if pod.Spec.Containers[i].Name == addr.ContainerName && + status.State.Running == nil { + return nil, fmt.Errorf("container %q is not running", addr.ContainerName) + } + } + readyChannel := make(chan struct{}) + fw, err := portforward.New(dialer, + []string{fmt.Sprintf("0:%d", addr.Port)}, + ctx.Done(), + readyChannel, + klogWriter(false, prefix), + klogWriter(true, prefix)) + if err != nil { + return nil, err + } + return fw, nil + } + + var connectionsCreated, connectionsClosed int32 + + runForwarding := func(fw *portforward.PortForwarder) { + defer fw.Close() + klog.V(5).Infof("%s: starting connection polling", prefix) + defer klog.V(5).Infof("%s: connection polling ended", prefix) + + failed := make(chan struct{}) + go func() { + defer close(failed) + klog.V(5).Infof("%s: starting port forwarding", prefix) + defer klog.V(5).Infof("%s: port forwarding ended", prefix) + + err := fw.ForwardPorts() + if err != nil { + if ctx.Err() == nil { + // Something failed unexpectedly. + klog.Errorf("%s: %v", prefix, err) + } else { + // Context is done, log error anyway. + klog.V(5).Infof("%s: %v", prefix, err) + } + } + }() + + // Wait for port forwarding to be ready. + select { + case <-ctx.Done(): + return + case <-failed: + // The reason was logged above. + return + case <-fw.Ready: + // Proceed... + } + + // This delay determines how quickly we notice when someone has + // connected inside the cluster. With socat, we cannot make this too small + // because otherwise we get many rejected connections. With the mock + // driver as proxy that doesn't happen as long as we don't + // ask for too many concurrent connections because the mock driver + // keeps the listening port open at all times and the Linux + // kernel automatically accepts our connection requests. + tryConnect := time.NewTicker(100 * time.Millisecond) + defer tryConnect.Stop() + for { + select { + case <-ctx.Done(): + return + case <-failed: + // The reason was logged above. + return + case <-tryConnect.C: + currentClosed := atomic.LoadInt32(&connectionsClosed) + openConnections := connectionsCreated - currentClosed + if openConnections >= maxConcurrentConnections { + break + } + + // Check whether we can establish a connection through the + // forwarded port. + ports, err := fw.GetPorts() + if err != nil { + // We checked for "port forwarding ready" above, so this + // shouldn't happen. + klog.Errorf("%s: no forwarded ports: %v", prefix, err) + return + } + + // We don't want to be blocked to long because we need to check + // for a port forwarding failure occasionally. + timeout := 10 * time.Second + deadline, ok := ctx.Deadline() + if ok { + untilDeadline := deadline.Sub(time.Now()) + if untilDeadline < timeout { + timeout = untilDeadline + } + } + + klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections) + c, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", ports[0].Local), timeout) + if err != nil { + klog.V(5).Infof("%s: no connection: %v", prefix, err) + break + } + // Make the connection available to Accept below. + klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated) + l.connections <- &connection{ + Conn: c, + addr: addr, + counter: connectionsCreated, + closed: &connectionsClosed, + } + connectionsCreated++ + } + } + } + + // Portforwarding and polling for connections run in the background. + go func() { + for { + fw, err := prepareForwarding() + if err == nil { + runForwarding(fw) + } else { + if apierrors.IsNotFound(err) { + // This is normal, the pod isn't running yet. Log with lower severity. + klog.V(5).Infof("prepare forwarding %s: %v", addr, err) + } else { + klog.Errorf("prepare forwarding %s: %v", addr, err) + } + } + + select { + case <-ctx.Done(): + return + // Sleep a bit before restarting. This is + // where we potentially wait for the pod to + // start. + case <-time.After(1 * time.Second): + } + } + }() + + return l, nil +} + +// Addr contains all relevant parameters for a certain port in a pod. +// The container must be running before connections are attempted. +type Addr struct { + Namespace, PodName, ContainerName string + Port int +} + +var _ net.Addr = Addr{} + +func (a Addr) Network() string { + return "port-forwarding" +} + +func (a Addr) String() string { + return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port) +} + +type listener struct { + addr Addr + connections chan *connection + ctx context.Context + cancel func() +} + +var _ net.Listener = &listener{} + +func (l *listener) Close() error { + klog.V(5).Infof("forward listener for %s: closing", l.addr) + l.cancel() + return nil +} + +func (l *listener) Accept() (net.Conn, error) { + select { + case <-l.ctx.Done(): + return nil, errors.New("listening was stopped") + case c := <-l.connections: + klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter) + return c, nil + } +} + +type connection struct { + net.Conn + addr Addr + counter int32 + closed *int32 + mutex sync.Mutex +} + +func (c *connection) Read(b []byte) (int, error) { + n, err := c.Conn.Read(b) + if errors.Is(err, io.EOF) { + klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr) + } + return n, err +} + +func (c *connection) Write(b []byte) (int, error) { + n, err := c.Conn.Write(b) + if errors.Is(err, io.EOF) { + klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr) + } + return n, err +} + +func (c *connection) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.closed != nil { + // Do the logging and book-keeping only once. The function itself may be called more than once. + klog.V(5).Infof("forward connection #%d for %s: closing our side", c.counter, c.addr) + atomic.AddInt32(c.closed, 1) + c.closed = nil + } + return c.Conn.Close() +} + +func (l *listener) Addr() net.Addr { + return l.addr +} + +func klogWriter(isError bool, prefix string) io.Writer { + reader, writer := io.Pipe() + go func() { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + text := scanner.Text() + if isError { + klog.Errorf("%s: %s", prefix, text) + } else { + klog.V(5).Infof("%s: %s", prefix, text) + } + } + }() + + return writer +} diff --git a/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml index 7ec5f427b3e..fe3f062afb4 100644 --- a/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml +++ b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml @@ -70,8 +70,6 @@ spec: volumeMounts: - mountPath: /csi name: socket-dir - - mountPath: /etc/hooks - name: hooks - mountPath: /var/lib/kubelet/pods name: kubelet-pods-dir - mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi @@ -95,6 +93,3 @@ spec: path: /var/lib/kubelet/plugins_registry type: Directory name: registration-dir - - name: hooks - configMap: - name: mock-driver-hooks diff --git a/test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml new file mode 100644 index 00000000000..7892392931e --- /dev/null +++ b/test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml @@ -0,0 +1,105 @@ +kind: StatefulSet +apiVersion: apps/v1 +metadata: + name: csi-mockplugin +spec: + selector: + matchLabels: + app: csi-mockplugin + replicas: 1 + template: + metadata: + labels: + app: csi-mockplugin + spec: + serviceAccountName: csi-mock + containers: + - name: csi-provisioner + image: k8s.gcr.io/sig-storage/csi-provisioner:v2.1.0 + args: + - "--csi-address=$(ADDRESS)" + # Topology support is needed for the pod rescheduling test + # ("storage capacity" in csi_mock_volume.go). + - "--feature-gates=Topology=true" + - "-v=5" + - "--timeout=1m" + # Needed for fsGroup support. + - "--default-fstype=ext4" + # We don't need much concurrency and having many gouroutines + # makes klog.Fatal during shutdown very long. + - "--worker-threads=5" + env: + - name: ADDRESS + value: /csi/csi.sock + volumeMounts: + - mountPath: /csi + name: socket-dir + - name: driver-registrar + image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.1.0 + args: + - --v=5 + - --csi-address=/csi/csi.sock + - --kubelet-registration-path=/var/lib/kubelet/plugins/csi-mock/csi.sock + - --timeout=1m + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /registration + name: registration-dir + - name: mock + image: k8s.gcr.io/sig-storage/mock-driver:v4.1.0 + args: + # -v3 shows when connections get established. Higher log levels print information about + # transferred bytes, but cannot print message content (no gRPC parsing), so this is usually + # not interesting. + - -v=3 + - -proxy-endpoint=tcp://:9000 + env: + - name: CSI_ENDPOINT + value: /csi/csi.sock + ports: + - containerPort: 9000 + name: socat + volumeMounts: + - mountPath: /csi + name: socket-dir + # The busybox container is needed for running shell commands which + # test for directories or create them. It needs additional privileges + # for that. + - name: busybox + image: k8s.gcr.io/busybox + securityContext: + privileged: true + command: + - sleep + - "100000" + volumeMounts: + - mountPath: /var/lib/kubelet/pods + name: kubelet-pods-dir + - mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi + name: kubelet-csi-dir + volumes: + - hostPath: + path: /var/lib/kubelet/plugins/csi-mock + type: DirectoryOrCreate + name: socket-dir + - hostPath: + path: /var/lib/kubelet/pods + type: Directory + # mock driver doesn't make mounts and therefore doesn't need mount propagation. + # mountPropagation: Bidirectional + name: kubelet-pods-dir + - hostPath: + path: /var/lib/kubelet/plugins/kubernetes.io/csi + type: DirectoryOrCreate + name: kubelet-csi-dir + - hostPath: + path: /var/lib/kubelet/plugins_registry + type: Directory + name: registration-dir