mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Map PV access modes to CSI access modes
This commit is contained in:
parent
8db83c89aa
commit
b7d732d3d6
@ -82,6 +82,7 @@ type csiClient interface {
|
||||
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
|
||||
NodeSupportsNodeExpand(ctx context.Context) (bool, error)
|
||||
NodeSupportsVolumeStats(ctx context.Context) (bool, error)
|
||||
NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error)
|
||||
}
|
||||
|
||||
// Strongly typed address
|
||||
@ -120,6 +121,8 @@ type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
|
||||
err error,
|
||||
)
|
||||
|
||||
type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode
|
||||
|
||||
// newV1NodeClient creates a new NodeClient with the internally used gRPC
|
||||
// connection set up. It also returns a closer which must to be called to close
|
||||
// the gRPC connection when the NodeClient is not used anymore.
|
||||
@ -217,7 +220,11 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||
|
||||
if c.nodeV1ClientCreator == nil {
|
||||
return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
|
||||
}
|
||||
|
||||
accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
||||
@ -235,7 +242,7 @@ func (c *csiDriverClient) NodePublishVolume(
|
||||
Secrets: secrets,
|
||||
VolumeCapability: &csipbv1.VolumeCapability{
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(accessMode),
|
||||
Mode: accessModeMapper(accessMode),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -279,6 +286,11 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
|
||||
return opts.newSize, errors.New("size can not be less than 0")
|
||||
}
|
||||
|
||||
accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
|
||||
if err != nil {
|
||||
return opts.newSize, err
|
||||
}
|
||||
|
||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
||||
if err != nil {
|
||||
return opts.newSize, err
|
||||
@ -291,7 +303,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
|
||||
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
|
||||
VolumeCapability: &csipbv1.VolumeCapability{
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(opts.accessMode),
|
||||
Mode: accessModeMapper(opts.accessMode),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -371,6 +383,11 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
|
||||
return errors.New("nodeV1ClientCreate is nil")
|
||||
}
|
||||
|
||||
accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -383,7 +400,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
|
||||
StagingTargetPath: stagingTargetPath,
|
||||
VolumeCapability: &csipbv1.VolumeCapability{
|
||||
AccessMode: &csipbv1.VolumeCapability_AccessMode{
|
||||
Mode: asCSIAccessModeV1(accessMode),
|
||||
Mode: accessModeMapper(accessMode),
|
||||
},
|
||||
},
|
||||
Secrets: secrets,
|
||||
@ -446,6 +463,17 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e
|
||||
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) {
|
||||
supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if supported {
|
||||
return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil
|
||||
}
|
||||
return asCSIAccessModeV1, nil
|
||||
}
|
||||
|
||||
func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
|
||||
switch am {
|
||||
case api.ReadWriteOnce:
|
||||
@ -454,6 +482,25 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili
|
||||
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
|
||||
case api.ReadWriteMany:
|
||||
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
|
||||
// This mapping exists to enable CSI drivers that lack the
|
||||
// SINGLE_NODE_MULTI_WRITER capability to work with the
|
||||
// ReadWriteOncePod access mode.
|
||||
case api.ReadWriteOncePod:
|
||||
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
|
||||
}
|
||||
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
|
||||
}
|
||||
|
||||
func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
|
||||
switch am {
|
||||
case api.ReadWriteOnce:
|
||||
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER
|
||||
case api.ReadOnlyMany:
|
||||
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
|
||||
case api.ReadWriteMany:
|
||||
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
|
||||
case api.ReadWriteOncePod:
|
||||
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
|
||||
}
|
||||
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
|
||||
}
|
||||
@ -510,6 +557,11 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er
|
||||
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
|
||||
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsSingleNodeMultiWriterAccessMode"))
|
||||
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
|
||||
}
|
||||
|
||||
func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
|
||||
klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
|
||||
if volID == "" {
|
||||
|
@ -301,6 +301,11 @@ func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (
|
||||
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
|
||||
}
|
||||
|
||||
func (c *fakeCsiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
|
||||
c.t.Log("calling fake.NodeSupportsSingleNodeMultiWriterAccessMode...")
|
||||
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
|
||||
}
|
||||
|
||||
func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
|
||||
capabilities, err := c.nodeGetCapabilities(ctx)
|
||||
if err != nil {
|
||||
@ -865,3 +870,83 @@ func TestVolumeStats(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAccessModeMapping(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
singleNodeMultiWriterSet bool
|
||||
accessMode api.PersistentVolumeAccessMode
|
||||
expectedMappedAccessMode csipbv1.VolumeCapability_AccessMode_Mode
|
||||
}{
|
||||
{
|
||||
name: "with ReadWriteOnce and incapable driver",
|
||||
singleNodeMultiWriterSet: false,
|
||||
accessMode: api.ReadWriteOnce,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
|
||||
},
|
||||
{
|
||||
name: "with ReadOnlyMany and incapable driver",
|
||||
singleNodeMultiWriterSet: false,
|
||||
accessMode: api.ReadOnlyMany,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
|
||||
},
|
||||
{
|
||||
name: "with ReadWriteMany and incapable driver",
|
||||
singleNodeMultiWriterSet: false,
|
||||
accessMode: api.ReadWriteMany,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
|
||||
},
|
||||
{
|
||||
name: "with ReadWriteOncePod and incapable driver",
|
||||
singleNodeMultiWriterSet: false,
|
||||
accessMode: api.ReadWriteOncePod,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
|
||||
},
|
||||
{
|
||||
name: "with ReadWriteOnce and capable driver",
|
||||
singleNodeMultiWriterSet: true,
|
||||
accessMode: api.ReadWriteOnce,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
|
||||
},
|
||||
{
|
||||
name: "with ReadOnlyMany and capable driver",
|
||||
singleNodeMultiWriterSet: true,
|
||||
accessMode: api.ReadOnlyMany,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
|
||||
},
|
||||
{
|
||||
name: "with ReadWriteMany and capable driver",
|
||||
singleNodeMultiWriterSet: true,
|
||||
accessMode: api.ReadWriteMany,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
|
||||
},
|
||||
{
|
||||
name: "with ReadWriteOncePod and capable driver",
|
||||
singleNodeMultiWriterSet: true,
|
||||
accessMode: api.ReadWriteOncePod,
|
||||
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fakeCloser := fake.NewCloser(t)
|
||||
client := &csiDriverClient{
|
||||
driverName: "Fake Driver Name",
|
||||
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
|
||||
nodeClient := fake.NewNodeClientWithSingleNodeMultiWriter(tc.singleNodeMultiWriterSet)
|
||||
return nodeClient, fakeCloser, nil
|
||||
},
|
||||
}
|
||||
|
||||
accessModeMapper, err := client.getNodeV1AccessModeMapper(context.Background())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
mappedAccessMode := accessModeMapper(tc.accessMode)
|
||||
if mappedAccessMode != tc.expectedMappedAccessMode {
|
||||
t.Errorf("expected access mode: %v; got: %v", tc.expectedMappedAccessMode, mappedAccessMode)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -85,6 +85,7 @@ type NodeClient struct {
|
||||
expansionSet bool
|
||||
volumeStatsSet bool
|
||||
volumeConditionSet bool
|
||||
singleNodeMultiWriterSet bool
|
||||
nodeGetInfoResp *csipb.NodeGetInfoResponse
|
||||
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
|
||||
FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
|
||||
@ -123,6 +124,16 @@ func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet
|
||||
}
|
||||
}
|
||||
|
||||
func NewNodeClientWithSingleNodeMultiWriter(singleNodeMultiWriterSet bool) *NodeClient {
|
||||
return &NodeClient{
|
||||
nodePublishedVolumes: make(map[string]CSIVolume),
|
||||
nodeStagedVolumes: make(map[string]CSIVolume),
|
||||
stageUnstageSet: true,
|
||||
volumeStatsSet: true,
|
||||
singleNodeMultiWriterSet: singleNodeMultiWriterSet,
|
||||
}
|
||||
}
|
||||
|
||||
// SetNextError injects next expected error
|
||||
func (f *NodeClient) SetNextError(err error) {
|
||||
f.nextErr = err
|
||||
@ -364,6 +375,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if f.singleNodeMultiWriterSet {
|
||||
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
|
||||
Type: &csipb.NodeServiceCapability_Rpc{
|
||||
Rpc: &csipb.NodeServiceCapability_RPC{
|
||||
Type: csipb.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user