From b7d732d3d6f0f41f2962c674d8cccd92bf5845a9 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Wed, 10 Mar 2021 20:54:51 -0800 Subject: [PATCH] Map PV access modes to CSI access modes --- pkg/volume/csi/csi_client.go | 58 ++++++++++++++++++-- pkg/volume/csi/csi_client_test.go | 85 ++++++++++++++++++++++++++++++ pkg/volume/csi/fake/fake_client.go | 21 ++++++++ 3 files changed, 161 insertions(+), 3 deletions(-) diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 6ca19a32d22..3df67888013 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -82,6 +82,7 @@ type csiClient interface { NodeSupportsStageUnstage(ctx context.Context) (bool, error) NodeSupportsNodeExpand(ctx context.Context) (bool, error) NodeSupportsVolumeStats(ctx context.Context) (bool, error) + NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) } // Strongly typed address @@ -120,6 +121,8 @@ type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) ( err error, ) +type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode + // newV1NodeClient creates a new NodeClient with the internally used gRPC // connection set up. It also returns a closer which must to be called to close // the gRPC connection when the NodeClient is not used anymore. @@ -217,7 +220,11 @@ func (c *csiDriverClient) NodePublishVolume( if c.nodeV1ClientCreator == nil { return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil") + } + accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) + if err != nil { + return err } nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) @@ -235,7 +242,7 @@ func (c *csiDriverClient) NodePublishVolume( Secrets: secrets, VolumeCapability: &csipbv1.VolumeCapability{ AccessMode: &csipbv1.VolumeCapability_AccessMode{ - Mode: asCSIAccessModeV1(accessMode), + Mode: accessModeMapper(accessMode), }, }, } @@ -279,6 +286,11 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp return opts.newSize, errors.New("size can not be less than 0") } + accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) + if err != nil { + return opts.newSize, err + } + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return opts.newSize, err @@ -291,7 +303,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, VolumeCapability: &csipbv1.VolumeCapability{ AccessMode: &csipbv1.VolumeCapability_AccessMode{ - Mode: asCSIAccessModeV1(opts.accessMode), + Mode: accessModeMapper(opts.accessMode), }, }, } @@ -371,6 +383,11 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, return errors.New("nodeV1ClientCreate is nil") } + accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) + if err != nil { + return err + } + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) if err != nil { return err @@ -383,7 +400,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, StagingTargetPath: stagingTargetPath, VolumeCapability: &csipbv1.VolumeCapability{ AccessMode: &csipbv1.VolumeCapability_AccessMode{ - Mode: asCSIAccessModeV1(accessMode), + Mode: accessModeMapper(accessMode), }, }, Secrets: secrets, @@ -446,6 +463,17 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME) } +func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) { + supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx) + if err != nil { + return nil, err + } + if supported { + return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil + } + return asCSIAccessModeV1, nil +} + func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode { switch am { case api.ReadWriteOnce: @@ -454,6 +482,25 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY case api.ReadWriteMany: return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER + // This mapping exists to enable CSI drivers that lack the + // SINGLE_NODE_MULTI_WRITER capability to work with the + // ReadWriteOncePod access mode. + case api.ReadWriteOncePod: + return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + } + return csipbv1.VolumeCapability_AccessMode_UNKNOWN +} + +func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode { + switch am { + case api.ReadWriteOnce: + return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER + case api.ReadOnlyMany: + return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY + case api.ReadWriteMany: + return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER + case api.ReadWriteOncePod: + return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER } return csipbv1.VolumeCapability_AccessMode_UNKNOWN } @@ -510,6 +557,11 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS) } +func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) { + klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsSingleNodeMultiWriterAccessMode")) + return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER) +} + func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) { klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath)) if volID == "" { diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index c7ca8fa05c4..5954090285d 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -301,6 +301,11 @@ func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) ( return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION) } +func (c *fakeCsiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) { + c.t.Log("calling fake.NodeSupportsSingleNodeMultiWriterAccessMode...") + return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER) +} + func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) { capabilities, err := c.nodeGetCapabilities(ctx) if err != nil { @@ -865,3 +870,83 @@ func TestVolumeStats(t *testing.T) { } } + +func TestAccessModeMapping(t *testing.T) { + tests := []struct { + name string + singleNodeMultiWriterSet bool + accessMode api.PersistentVolumeAccessMode + expectedMappedAccessMode csipbv1.VolumeCapability_AccessMode_Mode + }{ + { + name: "with ReadWriteOnce and incapable driver", + singleNodeMultiWriterSet: false, + accessMode: api.ReadWriteOnce, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + { + name: "with ReadOnlyMany and incapable driver", + singleNodeMultiWriterSet: false, + accessMode: api.ReadOnlyMany, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + { + name: "with ReadWriteMany and incapable driver", + singleNodeMultiWriterSet: false, + accessMode: api.ReadWriteMany, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + { + name: "with ReadWriteOncePod and incapable driver", + singleNodeMultiWriterSet: false, + accessMode: api.ReadWriteOncePod, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + { + name: "with ReadWriteOnce and capable driver", + singleNodeMultiWriterSet: true, + accessMode: api.ReadWriteOnce, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, + }, + { + name: "with ReadOnlyMany and capable driver", + singleNodeMultiWriterSet: true, + accessMode: api.ReadOnlyMany, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, + }, + { + name: "with ReadWriteMany and capable driver", + singleNodeMultiWriterSet: true, + accessMode: api.ReadWriteMany, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + { + name: "with ReadWriteOncePod and capable driver", + singleNodeMultiWriterSet: true, + accessMode: api.ReadWriteOncePod, + expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeCloser := fake.NewCloser(t) + client := &csiDriverClient{ + driverName: "Fake Driver Name", + nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) { + nodeClient := fake.NewNodeClientWithSingleNodeMultiWriter(tc.singleNodeMultiWriterSet) + return nodeClient, fakeCloser, nil + }, + } + + accessModeMapper, err := client.getNodeV1AccessModeMapper(context.Background()) + if err != nil { + t.Error(err) + } + + mappedAccessMode := accessModeMapper(tc.accessMode) + if mappedAccessMode != tc.expectedMappedAccessMode { + t.Errorf("expected access mode: %v; got: %v", tc.expectedMappedAccessMode, mappedAccessMode) + } + }) + } +} diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index f5c470b2ed3..ce37592449e 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -85,6 +85,7 @@ type NodeClient struct { expansionSet bool volumeStatsSet bool volumeConditionSet bool + singleNodeMultiWriterSet bool nodeGetInfoResp *csipb.NodeGetInfoResponse nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest @@ -123,6 +124,16 @@ func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet } } +func NewNodeClientWithSingleNodeMultiWriter(singleNodeMultiWriterSet bool) *NodeClient { + return &NodeClient{ + nodePublishedVolumes: make(map[string]CSIVolume), + nodeStagedVolumes: make(map[string]CSIVolume), + stageUnstageSet: true, + volumeStatsSet: true, + singleNodeMultiWriterSet: singleNodeMultiWriterSet, + } +} + // SetNextError injects next expected error func (f *NodeClient) SetNextError(err error) { f.nextErr = err @@ -364,6 +375,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC }, }) } + + if f.singleNodeMultiWriterSet { + resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{ + Type: &csipb.NodeServiceCapability_Rpc{ + Rpc: &csipb.NodeServiceCapability_RPC{ + Type: csipb.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + }, + }, + }) + } return resp, nil }