e2e storage: use embedded mock CSI driver

This replaces embedding of JavaScript code into the mock driver that
runs inside the cluster with Go callbacks which run inside the
e2e.test suite itself. In contrast to the JavaScript hooks, they have
direct access to all parameters and can fabricate arbitrary responses,
not just error codes.

Because the callbacks run in the same process as the test itself, it
is possible to set up two-way communication via shared variables or
channels. This opens the door for writing better tests. Some of the
existing tests that poll mock driver output could be simplified, but
that can be addressed later.

For now, only tests using hooks use embedding. How gRPC calls are
retrieved is abstracted behind the CSIMockTestDriver interface, so
tests don't need to be modified when switching between embedding
and remote mock driver.
This commit is contained in:
Patrick Ohly 2020-12-04 12:55:36 +01:00
parent 92bac8afc1
commit 3adcf11b45
14 changed files with 1027 additions and 584 deletions

View File

@ -19,15 +19,16 @@ package storage
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1alpha1 "k8s.io/api/storage/v1alpha1"
@ -75,13 +76,6 @@ const (
// How log to wait for kubelet to unstage a volume after a pod is deleted
csiUnstageWaitTimeout = 1 * time.Minute
// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
driverPodName = "csi-mockplugin-0"
// Name of CSI driver container name
driverContainerName = "mock"
// Prefix of the mock driver grpc log
grpcCallPrefix = "gRPCCall:"
)
// csiCall represents an expected call from Kubernetes to CSI mock driver and
@ -113,7 +107,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
disableResizingOnDriver bool
enableSnapshot bool
javascriptHooks map[string]string
hooks *drivers.Hooks
tokenRequests []storagev1.TokenRequest
requiresRepublish *bool
fsGroupPolicy *storagev1.FSGroupPolicy
@ -127,7 +121,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
pvcs []*v1.PersistentVolumeClaim
sc map[string]*storagev1.StorageClass
vsc map[string]*unstructured.Unstructured
driver storageframework.TestDriver
driver drivers.MockCSITestDriver
provisioner string
tp testParameters
}
@ -155,12 +149,29 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
EnableResizing: tp.enableResizing,
EnableNodeExpansion: tp.enableNodeExpansion,
EnableSnapshot: tp.enableSnapshot,
JavascriptHooks: tp.javascriptHooks,
TokenRequests: tp.tokenRequests,
RequiresRepublish: tp.requiresRepublish,
FSGroupPolicy: tp.fsGroupPolicy,
}
// At the moment, only tests which need hooks are
// using the embedded CSI mock driver. The rest run
// the driver inside the cluster although they could
// changed to use embedding merely by setting
// driverOpts.embedded to true.
//
// Not enabling it for all tests minimizes
// the risk that the introduction of embedded breaks
// some existings tests and avoids a dependency
// on port forwarding, which is important if some of
// these tests are supposed to become part of
// conformance testing (port forwarding isn't
// currently required).
if tp.hooks != nil {
driverOpts.Embedded = true
driverOpts.Hooks = *tp.hooks
}
// this just disable resizing on driver, keeping resizing on SC enabled.
if tp.disableResizingOnDriver {
driverOpts.EnableResizing = false
@ -188,10 +199,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
ginkgo.By("Creating pod")
var sc *storagev1.StorageClass
if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok {
sc = dDriver.GetDynamicProvisionStorageClass(m.config, "")
}
sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
scTest := testsuites.StorageClassTest{
Name: m.driver.GetDriverInfo().Name,
Timeouts: f.Timeouts,
@ -237,10 +245,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
createPodWithFSGroup := func(fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
ginkgo.By("Creating pod with fsGroup")
nodeSelection := m.config.ClientNodeSelection
var sc *storagev1.StorageClass
if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok {
sc = dDriver.GetDynamicProvisionStorageClass(m.config, "")
}
sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
scTest := testsuites.StorageClassTest{
Name: m.driver.GetDriverInfo().Name,
Provisioner: sc.Provisioner,
@ -514,7 +519,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
framework.ExpectNoError(err, "while deleting")
ginkgo.By("Checking CSI driver logs")
err = checkPodLogs(m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1)
err = checkPodLogs(m.driver.GetCalls, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1)
framework.ExpectNoError(err)
})
}
@ -727,19 +732,19 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
})
ginkgo.Context("CSI NodeStage error cases [Slow]", func() {
// Global variable in all scripts (called before each test)
globalScript := `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`
trackedCalls := []string{
"NodeStageVolume",
"NodeUnstageVolume",
}
tests := []struct {
name string
expectPodRunning bool
expectedCalls []csiCall
nodeStageScript string
nodeUnstageScript string
name string
expectPodRunning bool
expectedCalls []csiCall
// Called for each NodeStateVolume calls, with counter incremented atomically before
// the invocation (i.e. first value will be 1).
nodeStageHook func(counter int64) error
}{
{
// This is already tested elsewhere, adding simple good case here to test the test framework.
@ -749,7 +754,6 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
},
nodeStageScript: `OK;`,
},
{
// Kubelet should repeat NodeStage as long as the pod exists
@ -762,7 +766,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
},
// Fail first 3 NodeStage requests, 4th succeeds
nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { INVALIDARGUMENT; } else { OK; }`,
nodeStageHook: func(counter int64) error {
if counter < 4 {
return status.Error(codes.InvalidArgument, "fake error")
}
return nil
},
},
{
// Kubelet should repeat NodeStage as long as the pod exists
@ -775,7 +784,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
},
// Fail first 3 NodeStage requests, 4th succeeds
nodeStageScript: `console.log("Counter:", ++counter); if (counter < 4) { DEADLINEEXCEEDED; } else { OK; }`,
nodeStageHook: func(counter int64) error {
if counter < 4 {
return status.Error(codes.DeadlineExceeded, "fake error")
}
return nil
},
},
{
// After NodeUnstage with ephemeral error, the driver may continue staging the volume.
@ -789,7 +803,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
{expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded, deletePod: true},
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
},
nodeStageScript: `DEADLINEEXCEEDED;`,
nodeStageHook: func(counter int64) error {
return status.Error(codes.DeadlineExceeded, "fake error")
},
},
{
// After NodeUnstage with final error, kubelet can be sure the volume is not staged.
@ -801,21 +817,23 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
// This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff).
{expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument, deletePod: true},
},
nodeStageScript: `INVALIDARGUMENT;`,
// nodeStageScript: `INVALIDARGUMENT;`,
nodeStageHook: func(counter int64) error {
return status.Error(codes.InvalidArgument, "fake error")
},
},
}
for _, t := range tests {
test := t
ginkgo.It(test.name, func() {
scripts := map[string]string{
"globals": globalScript,
"nodeStageVolumeStart": test.nodeStageScript,
"nodeUnstageVolumeStart": test.nodeUnstageScript,
var hooks *drivers.Hooks
if test.nodeStageHook != nil {
hooks = createPreHook("NodeStageVolume", test.nodeStageHook)
}
init(testParameters{
disableAttach: true,
registerDriver: true,
javascriptHooks: scripts,
disableAttach: true,
registerDriver: true,
hooks: hooks,
})
defer cleanup()
@ -836,7 +854,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
framework.Failf("timed out waiting for the CSI call that indicates that the pod can be deleted: %v", test.expectedCalls)
}
time.Sleep(1 * time.Second)
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName)
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
framework.ExpectNoError(err, "while waiting for initial CSI calls")
if index == 0 {
// No CSI call received yet
@ -860,7 +878,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
ginkgo.By("Waiting for all remaining expected CSI calls")
err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName)
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
if err != nil {
return true, err
}
@ -946,11 +964,12 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
}
if test.resourceExhausted {
params.javascriptHooks = map[string]string{
"globals": `counter=0; console.log("globals loaded", OK, INVALIDARGUMENT)`,
// Every second call returns RESOURCEEXHAUSTED, starting with the first one.
"createVolumeStart": `console.log("Counter:", ++counter); if (counter % 2) { RESOURCEEXHAUSTED; } else { OK; }`,
}
params.hooks = createPreHook("CreateVolume", func(counter int64) error {
if counter%2 != 0 {
return status.Error(codes.ResourceExhausted, "fake error")
}
return nil
})
}
init(params)
@ -1006,9 +1025,9 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
expected = append(expected, normal...)
}
var calls []mockCSICall
var calls []drivers.MockCSICall
err = wait.PollImmediateUntil(time.Second, func() (done bool, err error) {
c, index, err := compareCSICalls(deterministicCalls, expected, m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName)
c, index, err := compareCSICalls(deterministicCalls, expected, m.driver.GetCalls)
if err != nil {
return true, fmt.Errorf("error waiting for expected CSI calls: %s", err)
}
@ -1221,31 +1240,32 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
})
ginkgo.Context("CSI Volume Snapshots [Feature:VolumeSnapshotDataSource]", func() {
// Global variable in all scripts (called before each test)
globalScript := `counter=0; console.log("globals loaded", OK, DEADLINEEXCEEDED)`
tests := []struct {
name string
createVolumeScript string
createSnapshotScript string
name string
createSnapshotHook func(counter int64) error
}{
{
name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists",
createVolumeScript: `OK`,
createSnapshotScript: `console.log("Counter:", ++counter); if (counter < 8) { DEADLINEEXCEEDED; } else { OK; }`,
name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists",
createSnapshotHook: func(counter int64) error {
if counter < 8 {
return status.Error(codes.DeadlineExceeded, "fake error")
}
return nil
},
},
}
for _, test := range tests {
test := test
ginkgo.It(test.name, func() {
scripts := map[string]string{
"globals": globalScript,
"createVolumeStart": test.createVolumeScript,
"createSnapshotStart": test.createSnapshotScript,
var hooks *drivers.Hooks
if test.createSnapshotHook != nil {
hooks = createPreHook("CreateSnapshot", test.createSnapshotHook)
}
init(testParameters{
disableAttach: true,
registerDriver: true,
enableSnapshot: true,
javascriptHooks: scripts,
disableAttach: true,
registerDriver: true,
enableSnapshot: true,
hooks: hooks,
})
sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver)
if !ok {
@ -1256,10 +1276,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
defer cancel()
defer cleanup()
var sc *storagev1.StorageClass
if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok {
sc = dDriver.GetDynamicProvisionStorageClass(m.config, "")
}
sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
ginkgo.By("Creating storage class")
class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
framework.ExpectNoError(err, "Failed to create class: %v", err)
@ -1402,7 +1419,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
framework.ExpectNoError(err, "while deleting")
ginkgo.By("Checking CSI driver logs")
err = checkPodLogs(m.cs, m.config.DriverNamespace.Name, driverPodName, driverContainerName, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume)
err = checkPodLogs(m.driver.GetCalls, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume)
framework.ExpectNoError(err)
})
}
@ -1507,33 +1524,30 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
annotations interface{}
)
// Global variable in all scripts (called before each test)
globalScript := `counter=0; console.log("globals loaded", OK, DEADLINEEXCEEDED)`
tests := []struct {
name string
createVolumeScript string
createSnapshotScript string
name string
createSnapshotHook func(counter int64) error
}{
{
// volume snapshot should be created using secrets successfully even if there is a failure in the first few attempts,
name: "volume snapshot create/delete with secrets",
createVolumeScript: `OK`,
name: "volume snapshot create/delete with secrets",
// Fail the first 8 calls to create snapshot and succeed the 9th call.
createSnapshotScript: `console.log("Counter:", ++counter); if (counter < 8) { DEADLINEEXCEEDED; } else { OK; }`,
createSnapshotHook: func(counter int64) error {
if counter < 8 {
return status.Error(codes.DeadlineExceeded, "fake error")
}
return nil
},
},
}
for _, test := range tests {
ginkgo.It(test.name, func() {
scripts := map[string]string{
"globals": globalScript,
"createVolumeStart": test.createVolumeScript,
"createSnapshotStart": test.createSnapshotScript,
}
hooks := createPreHook("CreateSnapshot", test.createSnapshotHook)
init(testParameters{
disableAttach: true,
registerDriver: true,
enableSnapshot: true,
javascriptHooks: scripts,
disableAttach: true,
registerDriver: true,
enableSnapshot: true,
hooks: hooks,
})
sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver)
@ -1895,24 +1909,9 @@ func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.Vol
return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
}
// Dummy structure that parses just volume_attributes and error code out of logged CSI call
type mockCSICall struct {
json string // full log entry
Method string
Request struct {
VolumeContext map[string]string `json:"volume_context"`
}
FullError struct {
Code codes.Code `json:"code"`
Message string `json:"message"`
}
Error string
}
// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes)
// has the matching NodeUnpublish
func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string, pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error {
func checkPodLogs(getCalls func() ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error {
expectedAttributes := map[string]string{}
if expectPodInfo {
expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name
@ -1934,10 +1933,11 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
foundAttributes := sets.NewString()
numNodePublishVolume := 0
numNodeUnpublishVolume := 0
calls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName)
calls, err := getCalls()
if err != nil {
return err
}
for _, call := range calls {
switch call.Method {
case "NodePublishVolume":
@ -1970,39 +1970,6 @@ func checkPodLogs(cs clientset.Interface, namespace, driverPodName, driverContai
return nil
}
func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, error) {
// Load logs of driver pod
log, err := e2epod.GetPodLogs(cs, namespace, driverPodName, driverContainerName)
if err != nil {
return nil, fmt.Errorf("could not load CSI driver logs: %s", err)
}
logLines := strings.Split(log, "\n")
var calls []mockCSICall
for _, line := range logLines {
index := strings.Index(line, grpcCallPrefix)
if index == -1 {
continue
}
line = line[index+len(grpcCallPrefix):]
call := mockCSICall{
json: string(line),
}
err := json.Unmarshal([]byte(line), &call)
if err != nil {
framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
continue
}
// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
methodParts := strings.Split(call.Method, "/")
call.Method = methodParts[len(methodParts)-1]
calls = append(calls, call)
}
return calls, nil
}
// compareCSICalls compares expectedCalls with logs of the mock driver.
// It returns index of the first expectedCall that was *not* received
// yet or error when calls do not match.
@ -2011,8 +1978,8 @@ func parseMockLogs(cs clientset.Interface, namespace, driverPodName, driverConta
//
// Only permanent errors are returned. Other errors are logged and no
// calls are returned. The caller is expected to retry.
func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs clientset.Interface, namespace, driverPodName, driverContainerName string) ([]mockCSICall, int, error) {
allCalls, err := parseMockLogs(cs, namespace, driverPodName, driverContainerName)
func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, getCalls func() ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) {
allCalls, err := getCalls()
if err != nil {
framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
return nil, 0, nil
@ -2020,8 +1987,8 @@ func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, cs c
// Remove all repeated and ignored calls
tracked := sets.NewString(trackedCalls...)
var calls []mockCSICall
var last mockCSICall
var calls []drivers.MockCSICall
var last drivers.MockCSICall
for _, c := range allCalls {
if !tracked.Has(c.Method) {
continue
@ -2145,3 +2112,20 @@ func checkDeleteSnapshotSecrets(cs clientset.Interface, annotations interface{})
return err
}
// createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name).
func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks {
var counter int64
return &drivers.Hooks{
Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
if strings.Contains(fullMethod, method) {
counter := atomic.AddInt64(&counter, 1)
return nil, callback(counter)
}
return nil, nil
}
}(),
}
}

View File

@ -1,110 +0,0 @@
/*
Copyright 2019 Kubernetes Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"context"
"net"
"sync"
"google.golang.org/grpc/reflection"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
)
// CSIDriverControllerServer is the Controller service component of the driver.
type CSIDriverControllerServer struct {
Controller csi.ControllerServer
Identity csi.IdentityServer
}
// CSIDriverController is the CSI Driver Controller backend.
type CSIDriverController struct {
listener net.Listener
server *grpc.Server
controllerServer *CSIDriverControllerServer
wg sync.WaitGroup
running bool
lock sync.Mutex
creds *CSICreds
}
func NewCSIDriverController(controllerServer *CSIDriverControllerServer) *CSIDriverController {
return &CSIDriverController{
controllerServer: controllerServer,
}
}
func (c *CSIDriverController) goServe(started chan<- bool) {
goServe(c.server, &c.wg, c.listener, started)
}
func (c *CSIDriverController) Address() string {
return c.listener.Addr().String()
}
func (c *CSIDriverController) Start(l net.Listener) error {
c.lock.Lock()
defer c.lock.Unlock()
// Set listener.
c.listener = l
// Create a new grpc server.
c.server = grpc.NewServer(
grpc.UnaryInterceptor(c.callInterceptor),
)
if c.controllerServer.Controller != nil {
csi.RegisterControllerServer(c.server, c.controllerServer.Controller)
}
if c.controllerServer.Identity != nil {
csi.RegisterIdentityServer(c.server, c.controllerServer.Identity)
}
reflection.Register(c.server)
waitForServer := make(chan bool)
c.goServe(waitForServer)
<-waitForServer
c.running = true
return nil
}
func (c *CSIDriverController) Stop() {
stop(&c.lock, &c.wg, c.server, c.running)
}
func (c *CSIDriverController) Close() {
c.server.Stop()
}
func (c *CSIDriverController) IsRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.running
}
func (c *CSIDriverController) SetDefaultCreds() {
setDefaultCreds(c.creds)
}
func (c *CSIDriverController) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return callInterceptor(ctx, c.creds, req, info, handler)
}

View File

@ -1,109 +0,0 @@
/*
Copyright 2019 Kubernetes Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
context "context"
"net"
"sync"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// CSIDriverNodeServer is the Node service component of the driver.
type CSIDriverNodeServer struct {
Node csi.NodeServer
Identity csi.IdentityServer
}
// CSIDriverNode is the CSI Driver Node backend.
type CSIDriverNode struct {
listener net.Listener
server *grpc.Server
nodeServer *CSIDriverNodeServer
wg sync.WaitGroup
running bool
lock sync.Mutex
creds *CSICreds
}
func NewCSIDriverNode(nodeServer *CSIDriverNodeServer) *CSIDriverNode {
return &CSIDriverNode{
nodeServer: nodeServer,
}
}
func (c *CSIDriverNode) goServe(started chan<- bool) {
goServe(c.server, &c.wg, c.listener, started)
}
func (c *CSIDriverNode) Address() string {
return c.listener.Addr().String()
}
func (c *CSIDriverNode) Start(l net.Listener) error {
c.lock.Lock()
defer c.lock.Unlock()
// Set listener.
c.listener = l
// Create a new grpc server.
c.server = grpc.NewServer(
grpc.UnaryInterceptor(c.callInterceptor),
)
if c.nodeServer.Node != nil {
csi.RegisterNodeServer(c.server, c.nodeServer.Node)
}
if c.nodeServer.Identity != nil {
csi.RegisterIdentityServer(c.server, c.nodeServer.Identity)
}
reflection.Register(c.server)
waitForServer := make(chan bool)
c.goServe(waitForServer)
<-waitForServer
c.running = true
return nil
}
func (c *CSIDriverNode) Stop() {
stop(&c.lock, &c.wg, c.server, c.running)
}
func (c *CSIDriverNode) Close() {
c.server.Stop()
}
func (c *CSIDriverNode) IsRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.running
}
func (c *CSIDriverNode) SetDefaultCreds() {
setDefaultCreds(c.creds)
}
func (c *CSIDriverNode) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return callInterceptor(ctx, c.creds, req, info, handler)
}

View File

@ -27,11 +27,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
"k8s.io/klog/v2"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
var (
@ -75,8 +74,11 @@ type CSIDriver struct {
running bool
lock sync.Mutex
creds *CSICreds
logGRPC LogGRPC
}
type LogGRPC func(method string, request, reply interface{}, err error)
func NewCSIDriver(servers *CSIDriverServers) *CSIDriver {
return &CSIDriver{
servers: servers,
@ -90,7 +92,12 @@ func (c *CSIDriver) goServe(started chan<- bool) {
func (c *CSIDriver) Address() string {
return c.listener.Addr().String()
}
func (c *CSIDriver) Start(l net.Listener) error {
// Start runs a gRPC server with all enabled services. If an interceptor
// is give, then it will be used. Otherwise, an interceptor which
// handles simple credential checks and logs gRPC calls in JSON format
// will be used.
func (c *CSIDriver) Start(l net.Listener, interceptor grpc.UnaryServerInterceptor) error {
c.lock.Lock()
defer c.lock.Unlock()
@ -98,9 +105,10 @@ func (c *CSIDriver) Start(l net.Listener) error {
c.listener = l
// Create a new grpc server
c.server = grpc.NewServer(
grpc.UnaryInterceptor(c.callInterceptor),
)
if interceptor == nil {
interceptor = c.callInterceptor
}
c.server = grpc.NewServer(grpc.UnaryInterceptor(interceptor))
// Register Mock servers
if c.servers.Controller != nil {
@ -112,7 +120,6 @@ func (c *CSIDriver) Start(l net.Listener) error {
if c.servers.Node != nil {
csi.RegisterNodeServer(c.server, c.servers.Node)
}
reflection.Register(c.server)
// Start listening for requests
waitForServer := make(chan bool)
@ -142,10 +149,6 @@ func (c *CSIDriver) SetDefaultCreds() {
setDefaultCreds(c.creds)
}
func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return callInterceptor(ctx, c.creds, req, info, handler)
}
// goServe starts a grpc server.
func goServe(server *grpc.Server, wg *sync.WaitGroup, listener net.Listener, started chan<- bool) {
wg.Add(1)
@ -187,14 +190,17 @@ func setDefaultCreds(creds *CSICreds) {
}
}
func callInterceptor(ctx context.Context, creds *CSICreds, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
err := authInterceptor(creds, req)
func (c *CSIDriver) callInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
err := authInterceptor(c.creds, req)
if err != nil {
logGRPC(info.FullMethod, req, nil, err)
return nil, err
}
rsp, err := handler(ctx, req)
logGRPC(info.FullMethod, req, rsp, err)
if c.logGRPC != nil {
c.logGRPC(info.FullMethod, req, rsp, err)
}
return rsp, err
}

View File

@ -19,7 +19,6 @@ package driver
import (
"net"
"github.com/kubernetes-csi/csi-test/v4/utils"
"google.golang.org/grpc"
)
@ -31,10 +30,11 @@ type MockCSIDriverServers struct {
type MockCSIDriver struct {
CSIDriver
conn *grpc.ClientConn
conn *grpc.ClientConn
interceptor grpc.UnaryServerInterceptor
}
func NewMockCSIDriver(servers *MockCSIDriverServers) *MockCSIDriver {
func NewMockCSIDriver(servers *MockCSIDriverServers, interceptor grpc.UnaryServerInterceptor) *MockCSIDriver {
return &MockCSIDriver{
CSIDriver: CSIDriver{
servers: &CSIDriverServers{
@ -43,6 +43,7 @@ func NewMockCSIDriver(servers *MockCSIDriverServers) *MockCSIDriver {
Identity: servers.Identity,
},
},
interceptor: interceptor,
}
}
@ -53,7 +54,7 @@ func (m *MockCSIDriver) StartOnAddress(network, address string) error {
return err
}
if err := m.CSIDriver.Start(l); err != nil {
if err := m.CSIDriver.Start(l, m.interceptor); err != nil {
l.Close()
return err
}
@ -67,22 +68,6 @@ func (m *MockCSIDriver) Start() error {
return m.StartOnAddress("tcp", "127.0.0.1:0")
}
func (m *MockCSIDriver) Nexus() (*grpc.ClientConn, error) {
// Start server
err := m.Start()
if err != nil {
return nil, err
}
// Create a client connection
m.conn, err = utils.Connect(m.Address(), grpc.WithInsecure())
if err != nil {
return nil, err
}
return m.conn, nil
}
func (m *MockCSIDriver) Close() {
m.conn.Close()
m.server.Stop()

View File

@ -46,9 +46,7 @@ func (s *service) CreateVolume(
if req.VolumeCapabilities == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
if hookVal, hookMsg := s.execHook("CreateVolumeStart"); hookVal != codes.OK {
return nil, status.Errorf(hookVal, hookMsg)
}
// Check to see if the volume already exists.
if i, v := s.findVolByName(ctx, req.Name); i >= 0 {
// Requested volume name already exists, need to check if the existing volume's
@ -610,10 +608,6 @@ func (s *service) CreateSnapshot(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "Snapshot SourceVolumeId cannot be empty")
}
if hookVal, hookMsg := s.execHook("CreateSnapshotStart"); hookVal != codes.OK {
return nil, status.Errorf(hookVal, hookMsg)
}
// Check to see if the snapshot already exists.
if i, v := s.snapshots.FindSnapshot("name", req.GetName()); i >= 0 {
// Requested snapshot name already exists

View File

@ -1,24 +0,0 @@
package service
// Predefinded constants for the JavaScript hooks, they must correspond to the
// error codes used by gRPC, see:
// https://github.com/grpc/grpc-go/blob/master/codes/codes.go
const (
grpcJSCodes string = `OK = 0;
CANCELED = 1;
UNKNOWN = 2;
INVALIDARGUMENT = 3;
DEADLINEEXCEEDED = 4;
NOTFOUND = 5;
ALREADYEXISTS = 6;
PERMISSIONDENIED = 7;
RESOURCEEXHAUSTED = 8;
FAILEDPRECONDITION = 9;
ABORTED = 10;
OUTOFRANGE = 11;
UNIMPLEMENTED = 12;
INTERNAL = 13;
UNAVAILABLE = 14;
DATALOSS = 15;
UNAUTHENTICATED = 16`
)

View File

@ -18,7 +18,6 @@ package service
import (
"fmt"
"os"
"path"
"strconv"
@ -35,10 +34,6 @@ func (s *service) NodeStageVolume(
req *csi.NodeStageVolumeRequest) (
*csi.NodeStageVolumeResponse, error) {
if hookVal, hookMsg := s.execHook("NodeStageVolumeStart"); hookVal != codes.OK {
return nil, status.Errorf(hookVal, hookMsg)
}
device, ok := req.PublishContext["device"]
if !ok {
if s.config.DisableAttach {
@ -62,7 +57,7 @@ func (s *service) NodeStageVolume(
return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot be empty")
}
exists, err := checkTargetExists(req.StagingTargetPath)
exists, err := s.config.IO.DirExists(req.StagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -113,10 +108,6 @@ func (s *service) NodeUnstageVolume(
return nil, status.Error(codes.InvalidArgument, "Staging Target Path cannot be empty")
}
if hookVal, hookMsg := s.execHook("NodeUnstageVolumeStart"); hookVal != codes.OK {
return nil, status.Errorf(hookVal, hookMsg)
}
s.volsRWL.Lock()
defer s.volsRWL.Unlock()
@ -178,7 +169,7 @@ func (s *service) NodePublishVolume(
// May happen with old (or, at this time, even the current) Kubernetes
// although it shouldn't (https://github.com/kubernetes/kubernetes/issues/75535).
exists, err := checkTargetExists(req.TargetPath)
exists, err := s.config.IO.DirExists(req.TargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -220,13 +211,13 @@ func (s *service) NodePublishVolume(
}
} else {
if req.GetTargetPath() != "" {
exists, err := checkTargetExists(req.GetTargetPath())
exists, err := s.config.IO.DirExists(req.GetTargetPath())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !exists {
// If target path does not exist we need to create the directory where volume will be staged
if err = os.Mkdir(req.TargetPath, os.FileMode(0755)); err != nil {
if err = s.config.IO.Mkdir(req.TargetPath); err != nil {
msg := fmt.Sprintf("NodePublishVolume: could not create target dir %q: %v", req.TargetPath, err)
return nil, status.Error(codes.Internal, msg)
}
@ -281,7 +272,7 @@ func (s *service) NodeUnpublishVolume(
}
// Delete any created paths
err := os.RemoveAll(v.VolumeContext[nodeMntPathKey])
err := s.config.IO.RemoveAll(v.VolumeContext[nodeMntPathKey])
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to delete previously created target directory")
}
@ -415,10 +406,6 @@ func (s *service) NodeGetInfo(ctx context.Context,
func (s *service) NodeGetVolumeStats(ctx context.Context,
req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
if hookVal, hookMsg := s.execHook("NodeGetVolumeStatsStart"); hookVal != codes.OK {
return nil, status.Errorf(hookVal, hookMsg)
}
resp := &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{},
}
@ -461,16 +448,3 @@ func (s *service) NodeGetVolumeStats(ctx context.Context,
return resp, nil
}
// checkTargetExists checks if a given path exists.
func checkTargetExists(targetPath string) (bool, error) {
_, err := os.Stat(targetPath)
switch {
case err == nil:
return true, nil
case os.IsNotExist(err):
return false, nil
default:
return false, err
}
}

View File

@ -18,21 +18,17 @@ package service
import (
"fmt"
"reflect"
"os"
"strings"
"sync"
"sync/atomic"
"k8s.io/klog"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-test/v4/mock/cache"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/cache"
"github.com/golang/protobuf/ptypes"
"github.com/robertkrimen/otto"
)
const (
@ -51,47 +47,7 @@ const (
// Manifest is the SP's manifest.
var Manifest = map[string]string{
"url": "https://github.com/kubernetes-csi/csi-test/mock",
}
// JavaScript hooks to be run to perform various tests
type Hooks struct {
Globals string `yaml:"globals"` // will be executed once before all other scripts
CreateVolumeStart string `yaml:"createVolumeStart"`
CreateVolumeEnd string `yaml:"createVolumeEnd"`
DeleteVolumeStart string `yaml:"deleteVolumeStart"`
DeleteVolumeEnd string `yaml:"deleteVolumeEnd"`
ControllerPublishVolumeStart string `yaml:"controllerPublishVolumeStart"`
ControllerPublishVolumeEnd string `yaml:"controllerPublishVolumeEnd"`
ControllerUnpublishVolumeStart string `yaml:"controllerUnpublishVolumeStart"`
ControllerUnpublishVolumeEnd string `yaml:"controllerUnpublishVolumeEnd"`
ValidateVolumeCapabilities string `yaml:"validateVolumeCapabilities"`
ListVolumesStart string `yaml:"listVolumesStart"`
ListVolumesEnd string `yaml:"listVolumesEnd"`
GetCapacity string `yaml:"getCapacity"`
ControllerGetCapabilitiesStart string `yaml:"controllerGetCapabilitiesStart"`
ControllerGetCapabilitiesEnd string `yaml:"controllerGetCapabilitiesEnd"`
CreateSnapshotStart string `yaml:"createSnapshotStart"`
CreateSnapshotEnd string `yaml:"createSnapshotEnd"`
DeleteSnapshotStart string `yaml:"deleteSnapshotStart"`
DeleteSnapshotEnd string `yaml:"deleteSnapshotEnd"`
ListSnapshots string `yaml:"listSnapshots"`
ControllerExpandVolumeStart string `yaml:"controllerExpandVolumeStart"`
ControllerExpandVolumeEnd string `yaml:"controllerExpandVolumeEnd"`
NodeStageVolumeStart string `yaml:"nodeStageVolumeStart"`
NodeStageVolumeEnd string `yaml:"nodeStageVolumeEnd"`
NodeUnstageVolumeStart string `yaml:"nodeUnstageVolumeStart"`
NodeUnstageVolumeEnd string `yaml:"nodeUnstageVolumeEnd"`
NodePublishVolumeStart string `yaml:"nodePublishVolumeStart"`
NodePublishVolumeEnd string `yaml:"nodePublishVolumeEnd"`
NodeUnpublishVolumeStart string `yaml:"nodeUnpublishVolumeStart"`
NodeUnpublishVolumeEnd string `yaml:"nodeUnpublishVolumeEnd"`
NodeExpandVolumeStart string `yaml:"nodeExpandVolumeStart"`
NodeExpandVolumeEnd string `yaml:"nodeExpandVolumeEnd"`
NodeGetCapabilities string `yaml:"nodeGetCapabilities"`
NodeGetInfo string `yaml:"nodeGetInfo"`
NodeGetVolumeStatsStart string `yaml:"nodeGetVolumeStatsStart"`
NodeGetVolumeStatsEnd string `yaml:"nodeGetVolumeStatsEnd"`
"url": "https://k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock",
}
type Config struct {
@ -103,7 +59,41 @@ type Config struct {
DisableOnlineExpansion bool
PermissiveTargetPath bool
EnableTopology bool
ExecHooks *Hooks
IO DirIO
}
// DirIO is an abstraction over direct os calls.
type DirIO interface {
// DirExists returns false if the path doesn't exist, true if it exists and is a directory, an error otherwise.
DirExists(path string) (bool, error)
// Mkdir creates the directory, but not its parents, with 0755 permissions.
Mkdir(path string) error
// RemoveAll removes the path and everything contained inside it. It's not an error if the path does not exist.
RemoveAll(path string) error
}
type OSDirIO struct{}
func (o OSDirIO) DirExists(path string) (bool, error) {
info, err := os.Stat(path)
switch {
case err == nil && !info.IsDir():
return false, fmt.Errorf("%s: not a directory", path)
case err == nil:
return true, nil
case os.IsNotExist(err):
return false, nil
default:
return false, err
}
}
func (o OSDirIO) Mkdir(path string) error {
return os.Mkdir(path, os.FileMode(0755))
}
func (o OSDirIO) RemoveAll(path string) error {
return os.RemoveAll(path)
}
// Service is the CSI Mock service provider.
@ -122,7 +112,6 @@ type service struct {
snapshots cache.SnapshotCache
snapshotsNID uint64
config Config
hooksVm *otto.Otto
}
type Volume struct {
@ -144,13 +133,8 @@ func New(config Config) Service {
nodeID: config.DriverName,
config: config,
}
if config.ExecHooks != nil {
s.hooksVm = otto.New()
s.hooksVm.Run(grpcJSCodes) // set global variables with gRPC error codes
_, err := s.hooksVm.Run(s.config.ExecHooks.Globals)
if err != nil {
klog.Exitf("Error encountered in the global exec hook: %v. Exiting\n", err)
}
if s.config.IO == nil {
s.config.IO = OSDirIO{}
}
s.snapshots = cache.NewSnapshotCache()
s.vols = []csi.Volume{
@ -288,22 +272,5 @@ func (s *service) getAttachCount(devPathKey string) int64 {
}
func (s *service) execHook(hookName string) (codes.Code, string) {
if s.hooksVm != nil {
script := reflect.ValueOf(*s.config.ExecHooks).FieldByName(hookName).String()
if len(script) > 0 {
result, err := s.hooksVm.Run(script)
if err != nil {
klog.Exitf("Exec hook %s error: %v; exiting\n", hookName, err)
}
rv, err := result.ToInteger()
if err == nil {
// Function returned an integer, use it
return codes.Code(rv), fmt.Sprintf("Exec hook %s returned non-OK code", hookName)
} else {
// Function returned non-integer data type, discard it
return codes.OK, ""
}
}
}
return codes.OK, ""
}

View File

@ -37,13 +37,16 @@ package drivers
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/onsi/ginkgo"
"google.golang.org/grpc/codes"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -51,14 +54,20 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver"
mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
"k8s.io/kubernetes/test/e2e/storage/utils"
"google.golang.org/grpc"
)
const (
@ -232,10 +241,42 @@ type mockCSIDriver struct {
attachLimit int
enableTopology bool
enableNodeExpansion bool
javascriptHooks map[string]string
hooks Hooks
tokenRequests []storagev1.TokenRequest
requiresRepublish *bool
fsGroupPolicy *storagev1.FSGroupPolicy
embedded bool
calls MockCSICalls
embeddedCSIDriver *mockdriver.CSIDriver
// Additional values set during PrepareTest
clientSet kubernetes.Interface
driverNamespace *v1.Namespace
}
// Hooks to be run to execute while handling gRPC calls.
//
// At the moment, only generic pre- and post-function call
// hooks are implemented. Those hooks can cast the request and
// response values if needed. More hooks inside specific
// functions could be added if needed.
type Hooks struct {
// Pre is called before invoking the mock driver's implementation of a method.
// If either a non-nil reply or error are returned, then those are returned to the caller.
Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error)
// Post is called after invoking the mock driver's implementation of a method.
// What it returns is used as actual result.
Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error)
}
// MockCSITestDriver provides additional functions specific to the CSI mock driver.
type MockCSITestDriver interface {
storageframework.DynamicPVTestDriver
// GetCalls returns all currently observed gRPC calls. Only valid
// after PrepareTest.
GetCalls() ([]MockCSICall, error)
}
// CSIMockDriverOpts defines options used for csi driver
@ -249,10 +290,94 @@ type CSIMockDriverOpts struct {
EnableResizing bool
EnableNodeExpansion bool
EnableSnapshot bool
JavascriptHooks map[string]string
TokenRequests []storagev1.TokenRequest
RequiresRepublish *bool
FSGroupPolicy *storagev1.FSGroupPolicy
// Embedded defines whether the CSI mock driver runs
// inside the cluster (false, the default) or just a proxy
// runs inside the cluster and all gRPC calls are handled
// inside the e2e.test binary.
Embedded bool
// Hooks that will be called if (and only if!) the embedded
// mock driver is used. Beware that hooks are invoked
// asynchronously in different goroutines.
Hooks Hooks
}
// Dummy structure that parses just volume_attributes and error code out of logged CSI call
type MockCSICall struct {
json string // full log entry
Method string
Request struct {
VolumeContext map[string]string `json:"volume_context"`
}
FullError struct {
Code codes.Code `json:"code"`
Message string `json:"message"`
}
Error string
}
// MockCSICalls is a Thread-safe storage for MockCSICall instances.
type MockCSICalls struct {
calls []MockCSICall
mutex sync.Mutex
}
// Get returns all currently recorded calls.
func (c *MockCSICalls) Get() []MockCSICall {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.calls[:]
}
// Add appens one new call at the end.
func (c *MockCSICalls) Add(call MockCSICall) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.calls = append(c.calls, call)
}
// LogGRPC takes individual parameters from the mock CSI driver and adds them.
func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) {
// Encoding to JSON and decoding mirrors the traditional way of capturing calls.
// Probably could be simplified now...
logMessage := struct {
Method string
Request interface{}
Response interface{}
// Error as string, for backward compatibility.
// "" on no error.
Error string
// Full error dump, to be able to parse out full gRPC error code and message separately in a test.
FullError error
}{
Method: method,
Request: request,
Response: reply,
FullError: err,
}
if err != nil {
logMessage.Error = err.Error()
}
msg, _ := json.Marshal(logMessage)
call := MockCSICall{
json: string(msg),
}
json.Unmarshal(msg, &call)
// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
methodParts := strings.Split(call.Method, "/")
call.Method = methodParts[len(methodParts)-1]
c.Add(call)
}
var _ storageframework.TestDriver = &mockCSIDriver{}
@ -260,7 +385,7 @@ var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{}
var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{}
// InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface
func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver {
func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
driverManifests := []string{
"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
@ -268,7 +393,11 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver
"test/e2e/testing-manifests/storage-csi/external-snapshotter/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
"test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml",
}
if driverOpts.Embedded {
driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml")
} else {
driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml")
}
if driverOpts.RegisterDriver {
@ -309,10 +438,11 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) storageframework.TestDriver
attachable: !driverOpts.DisableAttach,
attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion,
javascriptHooks: driverOpts.JavascriptHooks,
tokenRequests: driverOpts.TokenRequests,
requiresRepublish: driverOpts.RequiresRepublish,
fsGroupPolicy: driverOpts.FSGroupPolicy,
embedded: driverOpts.Embedded,
hooks: driverOpts.Hooks,
}
}
@ -340,62 +470,108 @@ func (m *mockCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig,
}
func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
m.clientSet = f.ClientSet
// Create secondary namespace which will be used for creating driver
driverNamespace := utils.CreateDriverNamespace(f)
driverns := driverNamespace.Name
m.driverNamespace = utils.CreateDriverNamespace(f)
driverns := m.driverNamespace.Name
testns := f.Namespace.Name
ginkgo.By("deploying csi mock driver")
cancelLogging := utils.StartPodLogs(f, driverNamespace)
if m.embedded {
ginkgo.By("deploying csi mock proxy")
} else {
ginkgo.By("deploying csi mock driver")
}
cancelLogging := utils.StartPodLogs(f, m.driverNamespace)
cs := f.ClientSet
// pods should be scheduled on the node
node, err := e2enode.GetRandomReadySchedulableNode(cs)
framework.ExpectNoError(err)
embeddedCleanup := func() {}
containerArgs := []string{}
if m.embedded {
// Run embedded CSI driver.
//
// For now we start exactly one instance which implements controller,
// node and identity services. It matches with the one pod that we run
// inside the cluster. The name and namespace of that one is deterministic,
// so we know what to connect to.
//
// Long-term we could also deploy one central controller and multiple
// node instances, with knowledge about provisioned volumes shared in
// this process.
podname := "csi-mockplugin-0"
containername := "mock"
ctx, cancel := context.WithCancel(context.Background())
serviceConfig := mockservice.Config{
DisableAttach: !m.attachable,
DriverName: "csi-mock-" + f.UniqueName,
AttachLimit: int64(m.attachLimit),
NodeExpansionRequired: m.enableNodeExpansion,
EnableTopology: m.enableTopology,
IO: proxy.PodDirIO{
F: f,
Namespace: m.driverNamespace.Name,
PodName: podname,
ContainerName: "busybox",
},
}
s := mockservice.New(serviceConfig)
servers := &mockdriver.CSIDriverServers{
Controller: s,
Identity: s,
Node: s,
}
m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers)
l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(),
proxy.Addr{
Namespace: m.driverNamespace.Name,
PodName: podname,
ContainerName: containername,
Port: 9000,
},
)
framework.ExpectNoError(err, "start connecting to proxy pod")
err = m.embeddedCSIDriver.Start(l, m.interceptGRPC)
framework.ExpectNoError(err, "start mock driver")
embeddedCleanup = func() {
// Kill all goroutines and delete resources of the mock driver.
m.embeddedCSIDriver.Stop()
l.Close()
cancel()
}
} else {
// When using the mock driver inside the cluster it has to be reconfigured
// via command line parameters.
containerArgs = append(containerArgs, "--name=csi-mock-"+f.UniqueName)
if !m.attachable {
containerArgs = append(containerArgs, "--disable-attach")
}
if m.enableTopology {
containerArgs = append(containerArgs, "--enable-topology")
}
if m.attachLimit > 0 {
containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
}
if m.enableNodeExpansion {
containerArgs = append(containerArgs, "--node-expand-required=true")
}
}
config := &storageframework.PerTestConfig{
Driver: m,
Prefix: "mock",
Framework: f,
ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
DriverNamespace: driverNamespace,
}
containerArgs := []string{"--name=csi-mock-" + f.UniqueName}
if !m.attachable {
containerArgs = append(containerArgs, "--disable-attach")
}
if m.enableTopology {
containerArgs = append(containerArgs, "--enable-topology")
}
if m.attachLimit > 0 {
containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
}
if m.enableNodeExpansion {
containerArgs = append(containerArgs, "--node-expand-required=true")
}
// Create a config map with javascript hooks. Create it even when javascriptHooks
// are empty, so we can unconditionally add it to the mock pod.
const hooksConfigMapName = "mock-driver-hooks"
hooksYaml, err := yaml.Marshal(m.javascriptHooks)
framework.ExpectNoError(err)
hooks := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: hooksConfigMapName,
},
Data: map[string]string{
"hooks.yaml": string(hooksYaml),
},
}
_, err = f.ClientSet.CoreV1().ConfigMaps(driverns).Create(context.TODO(), hooks, metav1.CreateOptions{})
framework.ExpectNoError(err)
if len(m.javascriptHooks) > 0 {
containerArgs = append(containerArgs, "--hooks-file=/etc/hooks/hooks.yaml")
DriverNamespace: m.driverNamespace,
}
o := utils.PatchCSIOptions{
@ -416,7 +592,7 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P
RequiresRepublish: m.requiresRepublish,
FSGroupPolicy: m.fsGroupPolicy,
}
cleanup, err := utils.CreateFromManifests(f, driverNamespace, func(item interface{}) error {
cleanup, err := utils.CreateFromManifests(f, m.driverNamespace, func(item interface{}) error {
return utils.PatchCSIDeployment(f, o, item)
}, m.manifests...)
@ -424,7 +600,7 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P
framework.Failf("deploying csi mock driver: %v", err)
}
cleanupFunc := generateDriverCleanupFunc(
driverCleanupFunc := generateDriverCleanupFunc(
f,
"mock",
testns,
@ -432,9 +608,83 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.P
cleanup,
cancelLogging)
cleanupFunc := func() {
embeddedCleanup()
driverCleanupFunc()
}
return config, cleanupFunc
}
func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
// Always log the call and its final result,
// regardless whether the result was from the real
// implementation or a hook.
m.calls.LogGRPC(info.FullMethod, req, resp, err)
}()
if m.hooks.Pre != nil {
resp, err = m.hooks.Pre(ctx, info.FullMethod, req)
if resp != nil || err != nil {
return
}
}
resp, err = handler(ctx, req)
if m.hooks.Post != nil {
resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err)
}
return
}
func (m *mockCSIDriver) GetCalls() ([]MockCSICall, error) {
if m.embedded {
return m.calls.Get(), nil
}
if m.driverNamespace == nil {
return nil, errors.New("PrepareTest not called yet")
}
// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
driverPodName := "csi-mockplugin-0"
// Name of CSI driver container name
driverContainerName := "mock"
// Prefix of the mock driver grpc log
grpcCallPrefix := "gRPCCall:"
// Load logs of driver pod
log, err := e2epod.GetPodLogs(m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName)
if err != nil {
return nil, fmt.Errorf("could not load CSI driver logs: %s", err)
}
logLines := strings.Split(log, "\n")
var calls []MockCSICall
for _, line := range logLines {
index := strings.Index(line, grpcCallPrefix)
if index == -1 {
continue
}
line = line[index+len(grpcCallPrefix):]
call := MockCSICall{
json: string(line),
}
err := json.Unmarshal([]byte(line), &call)
if err != nil {
framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
continue
}
// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
methodParts := strings.Split(call.Method, "/")
call.Method = methodParts[len(methodParts)-1]
calls = append(calls, call)
}
return calls, nil
}
// gce-pd
type gcePDCSIDriver struct {
driverInfo storageframework.DriverInfo

View File

@ -0,0 +1,82 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
)
type PodDirIO struct {
F *framework.Framework
Namespace string
PodName string
ContainerName string
}
var _ service.DirIO = PodDirIO{}
func (p PodDirIO) DirExists(path string) (bool, error) {
stdout, stderr, err := p.execute([]string{
"sh",
"-c",
fmt.Sprintf("if ! [ -e '%s' ]; then echo notexist; elif [ -d '%s' ]; then echo dir; else echo nodir; fi", path, path),
})
if err != nil {
return false, fmt.Errorf("error executing dir test commands: stderr=%q, %v", stderr, err)
}
switch stdout {
case "notexist":
return false, nil
case "nodir":
return false, fmt.Errorf("%s: not a directory", path)
case "dir":
return true, nil
default:
return false, fmt.Errorf("unexpected output from dir test commands: %q", stdout)
}
}
func (p PodDirIO) Mkdir(path string) error {
_, stderr, err := p.execute([]string{"mkdir", path})
if err != nil {
return fmt.Errorf("mkdir %q: stderr=%q, %v", path, stderr, err)
}
return nil
}
func (p PodDirIO) RemoveAll(path string) error {
_, stderr, err := p.execute([]string{"rm", "-rf", path})
if err != nil {
return fmt.Errorf("rm -rf %q: stderr=%q, %v", path, stderr, err)
}
return nil
}
func (p PodDirIO) execute(command []string) (string, string, error) {
return p.F.ExecWithOptions(framework.ExecOptions{
Command: command,
Namespace: p.Namespace,
PodName: p.PodName,
ContainerName: p.ContainerName,
CaptureStdout: true,
CaptureStderr: true,
Quiet: true,
})
}

View File

@ -0,0 +1,344 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
)
// Maximum number of forwarded connections. In practice we don't
// need more than one per sidecar and kubelet. Keeping this reasonably
// small ensures that we don't establish connections through the apiserver
// and the remote kernel which then arent' needed.
const maxConcurrentConnections = 10
// Listen creates a listener which returns new connections whenever someone connects
// to a socat or mock driver proxy instance running inside the given pod.
//
// socat must by started with "<listen>,fork TCP-LISTEN:<port>,reuseport"
// for this to work. "<listen>" can be anything that accepts connections,
// for example "UNIX-LISTEN:/csi/csi.sock". In this mode, socat will
// accept exactly one connection on the given port for each connection
// that socat itself accepted.
//
// Listening stops when the context is done or Close() is called.
func Listen(ctx context.Context, clientset kubernetes.Interface, restConfig *rest.Config, addr Addr) (net.Listener, error) {
// We connect through port forwarding. Strictly
// speaking this is overkill because we don't need a local
// port. But this way we can reuse client-go/tools/portforward
// instead of having to replicate handleConnection
// in our own code.
restClient := clientset.CoreV1().RESTClient()
if restConfig.GroupVersion == nil {
restConfig.GroupVersion = &schema.GroupVersion{}
}
if restConfig.NegotiatedSerializer == nil {
restConfig.NegotiatedSerializer = scheme.Codecs
}
// The setup code around the actual portforward is from
// https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go
req := restClient.Post().
Resource("pods").
Namespace(addr.Namespace).
Name(addr.PodName).
SubResource("portforward")
transport, upgrader, err := spdy.RoundTripperFor(restConfig)
if err != nil {
return nil, fmt.Errorf("create round tripper: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
prefix := fmt.Sprintf("port forwarding for %s", addr)
ctx, cancel := context.WithCancel(ctx)
l := &listener{
connections: make(chan *connection),
ctx: ctx,
cancel: cancel,
addr: addr,
}
// Port forwarding is allowed to fail and will be restarted when it does.
prepareForwarding := func() (*portforward.PortForwarder, error) {
pod, err := clientset.CoreV1().Pods(addr.Namespace).Get(ctx, addr.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
for i, status := range pod.Status.ContainerStatuses {
if pod.Spec.Containers[i].Name == addr.ContainerName &&
status.State.Running == nil {
return nil, fmt.Errorf("container %q is not running", addr.ContainerName)
}
}
readyChannel := make(chan struct{})
fw, err := portforward.New(dialer,
[]string{fmt.Sprintf("0:%d", addr.Port)},
ctx.Done(),
readyChannel,
klogWriter(false, prefix),
klogWriter(true, prefix))
if err != nil {
return nil, err
}
return fw, nil
}
var connectionsCreated, connectionsClosed int32
runForwarding := func(fw *portforward.PortForwarder) {
defer fw.Close()
klog.V(5).Infof("%s: starting connection polling", prefix)
defer klog.V(5).Infof("%s: connection polling ended", prefix)
failed := make(chan struct{})
go func() {
defer close(failed)
klog.V(5).Infof("%s: starting port forwarding", prefix)
defer klog.V(5).Infof("%s: port forwarding ended", prefix)
err := fw.ForwardPorts()
if err != nil {
if ctx.Err() == nil {
// Something failed unexpectedly.
klog.Errorf("%s: %v", prefix, err)
} else {
// Context is done, log error anyway.
klog.V(5).Infof("%s: %v", prefix, err)
}
}
}()
// Wait for port forwarding to be ready.
select {
case <-ctx.Done():
return
case <-failed:
// The reason was logged above.
return
case <-fw.Ready:
// Proceed...
}
// This delay determines how quickly we notice when someone has
// connected inside the cluster. With socat, we cannot make this too small
// because otherwise we get many rejected connections. With the mock
// driver as proxy that doesn't happen as long as we don't
// ask for too many concurrent connections because the mock driver
// keeps the listening port open at all times and the Linux
// kernel automatically accepts our connection requests.
tryConnect := time.NewTicker(100 * time.Millisecond)
defer tryConnect.Stop()
for {
select {
case <-ctx.Done():
return
case <-failed:
// The reason was logged above.
return
case <-tryConnect.C:
currentClosed := atomic.LoadInt32(&connectionsClosed)
openConnections := connectionsCreated - currentClosed
if openConnections >= maxConcurrentConnections {
break
}
// Check whether we can establish a connection through the
// forwarded port.
ports, err := fw.GetPorts()
if err != nil {
// We checked for "port forwarding ready" above, so this
// shouldn't happen.
klog.Errorf("%s: no forwarded ports: %v", prefix, err)
return
}
// We don't want to be blocked to long because we need to check
// for a port forwarding failure occasionally.
timeout := 10 * time.Second
deadline, ok := ctx.Deadline()
if ok {
untilDeadline := deadline.Sub(time.Now())
if untilDeadline < timeout {
timeout = untilDeadline
}
}
klog.V(5).Infof("%s: trying to create a new connection #%d, %d open", prefix, connectionsCreated, openConnections)
c, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", ports[0].Local), timeout)
if err != nil {
klog.V(5).Infof("%s: no connection: %v", prefix, err)
break
}
// Make the connection available to Accept below.
klog.V(5).Infof("%s: created a new connection #%d", prefix, connectionsCreated)
l.connections <- &connection{
Conn: c,
addr: addr,
counter: connectionsCreated,
closed: &connectionsClosed,
}
connectionsCreated++
}
}
}
// Portforwarding and polling for connections run in the background.
go func() {
for {
fw, err := prepareForwarding()
if err == nil {
runForwarding(fw)
} else {
if apierrors.IsNotFound(err) {
// This is normal, the pod isn't running yet. Log with lower severity.
klog.V(5).Infof("prepare forwarding %s: %v", addr, err)
} else {
klog.Errorf("prepare forwarding %s: %v", addr, err)
}
}
select {
case <-ctx.Done():
return
// Sleep a bit before restarting. This is
// where we potentially wait for the pod to
// start.
case <-time.After(1 * time.Second):
}
}
}()
return l, nil
}
// Addr contains all relevant parameters for a certain port in a pod.
// The container must be running before connections are attempted.
type Addr struct {
Namespace, PodName, ContainerName string
Port int
}
var _ net.Addr = Addr{}
func (a Addr) Network() string {
return "port-forwarding"
}
func (a Addr) String() string {
return fmt.Sprintf("%s/%s:%d", a.Namespace, a.PodName, a.Port)
}
type listener struct {
addr Addr
connections chan *connection
ctx context.Context
cancel func()
}
var _ net.Listener = &listener{}
func (l *listener) Close() error {
klog.V(5).Infof("forward listener for %s: closing", l.addr)
l.cancel()
return nil
}
func (l *listener) Accept() (net.Conn, error) {
select {
case <-l.ctx.Done():
return nil, errors.New("listening was stopped")
case c := <-l.connections:
klog.V(5).Infof("forward listener for %s: got a new connection #%d", l.addr, c.counter)
return c, nil
}
}
type connection struct {
net.Conn
addr Addr
counter int32
closed *int32
mutex sync.Mutex
}
func (c *connection) Read(b []byte) (int, error) {
n, err := c.Conn.Read(b)
if errors.Is(err, io.EOF) {
klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr)
}
return n, err
}
func (c *connection) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
if errors.Is(err, io.EOF) {
klog.V(5).Infof("forward connection #%d for %s: remote side closed the stream", c.counter, c.addr)
}
return n, err
}
func (c *connection) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.closed != nil {
// Do the logging and book-keeping only once. The function itself may be called more than once.
klog.V(5).Infof("forward connection #%d for %s: closing our side", c.counter, c.addr)
atomic.AddInt32(c.closed, 1)
c.closed = nil
}
return c.Conn.Close()
}
func (l *listener) Addr() net.Addr {
return l.addr
}
func klogWriter(isError bool, prefix string) io.Writer {
reader, writer := io.Pipe()
go func() {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
text := scanner.Text()
if isError {
klog.Errorf("%s: %s", prefix, text)
} else {
klog.V(5).Infof("%s: %s", prefix, text)
}
}
}()
return writer
}

View File

@ -70,8 +70,6 @@ spec:
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /etc/hooks
name: hooks
- mountPath: /var/lib/kubelet/pods
name: kubelet-pods-dir
- mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi
@ -95,6 +93,3 @@ spec:
path: /var/lib/kubelet/plugins_registry
type: Directory
name: registration-dir
- name: hooks
configMap:
name: mock-driver-hooks

View File

@ -0,0 +1,105 @@
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-mockplugin
spec:
selector:
matchLabels:
app: csi-mockplugin
replicas: 1
template:
metadata:
labels:
app: csi-mockplugin
spec:
serviceAccountName: csi-mock
containers:
- name: csi-provisioner
image: k8s.gcr.io/sig-storage/csi-provisioner:v2.1.0
args:
- "--csi-address=$(ADDRESS)"
# Topology support is needed for the pod rescheduling test
# ("storage capacity" in csi_mock_volume.go).
- "--feature-gates=Topology=true"
- "-v=5"
- "--timeout=1m"
# Needed for fsGroup support.
- "--default-fstype=ext4"
# We don't need much concurrency and having many gouroutines
# makes klog.Fatal during shutdown very long.
- "--worker-threads=5"
env:
- name: ADDRESS
value: /csi/csi.sock
volumeMounts:
- mountPath: /csi
name: socket-dir
- name: driver-registrar
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.1.0
args:
- --v=5
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/csi-mock/csi.sock
- --timeout=1m
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /registration
name: registration-dir
- name: mock
image: k8s.gcr.io/sig-storage/mock-driver:v4.1.0
args:
# -v3 shows when connections get established. Higher log levels print information about
# transferred bytes, but cannot print message content (no gRPC parsing), so this is usually
# not interesting.
- -v=3
- -proxy-endpoint=tcp://:9000
env:
- name: CSI_ENDPOINT
value: /csi/csi.sock
ports:
- containerPort: 9000
name: socat
volumeMounts:
- mountPath: /csi
name: socket-dir
# The busybox container is needed for running shell commands which
# test for directories or create them. It needs additional privileges
# for that.
- name: busybox
image: k8s.gcr.io/busybox
securityContext:
privileged: true
command:
- sleep
- "100000"
volumeMounts:
- mountPath: /var/lib/kubelet/pods
name: kubelet-pods-dir
- mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi
name: kubelet-csi-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-mock
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/lib/kubelet/pods
type: Directory
# mock driver doesn't make mounts and therefore doesn't need mount propagation.
# mountPropagation: Bidirectional
name: kubelet-pods-dir
- hostPath:
path: /var/lib/kubelet/plugins/kubernetes.io/csi
type: DirectoryOrCreate
name: kubelet-csi-dir
- hostPath:
path: /var/lib/kubelet/plugins_registry
type: Directory
name: registration-dir