mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
CSI client helpers for NodeGetCapabilities
This commit is contained in:
parent
5f98f6cfa4
commit
8db83c89aa
@ -438,66 +438,12 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
|
|||||||
|
|
||||||
func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
|
func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
|
||||||
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
|
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
|
||||||
if c.nodeV1ClientCreator == nil {
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
|
||||||
return false, errors.New("nodeV1ClientCreate is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer closer.Close()
|
|
||||||
|
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
|
||||||
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
|
func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
|
||||||
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
|
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
|
||||||
if c.nodeV1ClientCreator == nil {
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
|
||||||
return false, errors.New("nodeV1ClientCreate is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer closer.Close()
|
|
||||||
|
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
|
||||||
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
|
|
||||||
stageUnstageSet := false
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
|
|
||||||
stageUnstageSet = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stageUnstageSet, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
|
func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
|
||||||
@ -561,30 +507,7 @@ func (c *csiClientGetter) Get() (csiClient, error) {
|
|||||||
|
|
||||||
func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
|
func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
|
||||||
klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
|
klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
|
||||||
if c.nodeV1ClientCreator == nil {
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
|
||||||
return false, errors.New("nodeV1ClientCreate is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer closer.Close()
|
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
|
||||||
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
|
func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
|
||||||
@ -628,7 +551,7 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
|
||||||
isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, nodeClient)
|
isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -661,30 +584,47 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
|
|||||||
return metrics, nil
|
return metrics, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func supportNodeGetVolumeCondition(ctx context.Context, nodeClient csipbv1.NodeClient) (supportNodeGetVolumeCondition bool, err error) {
|
func (c *csiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
|
||||||
req := csipbv1.NodeGetCapabilitiesRequest{}
|
klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if nodeSupportsVolumeCondition"))
|
||||||
rsp, err := nodeClient.NodeGetCapabilities(ctx, &req)
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *csiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
|
||||||
|
capabilities, err := c.nodeGetCapabilities(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, cap := range rsp.GetCapabilities() {
|
for _, capability := range capabilities {
|
||||||
if cap == nil {
|
if capability == nil || capability.GetRpc() == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
rpc := cap.GetRpc()
|
if capability.GetRpc().GetType() == capabilityType {
|
||||||
if rpc == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
t := rpc.GetType()
|
|
||||||
if t == csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION {
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *csiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
|
||||||
|
if c.nodeV1ClientCreator == nil {
|
||||||
|
return []*csipbv1.NodeServiceCapability{}, errors.New("nodeV1ClientCreate is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
|
||||||
|
if err != nil {
|
||||||
|
return []*csipbv1.NodeServiceCapability{}, err
|
||||||
|
}
|
||||||
|
defer closer.Close()
|
||||||
|
|
||||||
|
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
||||||
|
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return []*csipbv1.NodeServiceCapability{}, err
|
||||||
|
}
|
||||||
|
return resp.GetCapabilities(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func isFinalError(err error) bool {
|
func isFinalError(err error) bool {
|
||||||
// Sources:
|
// Sources:
|
||||||
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
|
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
|
||||||
|
@ -106,7 +106,7 @@ func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID stri
|
|||||||
|
|
||||||
metrics := &volume.Metrics{}
|
metrics := &volume.Metrics{}
|
||||||
|
|
||||||
isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, c.nodeClient)
|
isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -137,21 +137,7 @@ func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID stri
|
|||||||
|
|
||||||
func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
|
func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
|
||||||
c.t.Log("calling fake.NodeSupportsVolumeStats...")
|
c.t.Log("calling fake.NodeSupportsVolumeStats...")
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
|
||||||
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *fakeCsiDriverClient) NodePublishVolume(
|
func (c *fakeCsiDriverClient) NodePublishVolume(
|
||||||
@ -269,46 +255,12 @@ func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stag
|
|||||||
|
|
||||||
func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
|
func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
|
||||||
c.t.Log("calling fake.NodeSupportsNodeExpand...")
|
c.t.Log("calling fake.NodeSupportsNodeExpand...")
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
|
||||||
|
|
||||||
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
|
func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
|
||||||
c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
|
c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
|
||||||
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
|
||||||
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
capabilities := resp.GetCapabilities()
|
|
||||||
|
|
||||||
stageUnstageSet := false
|
|
||||||
if capabilities == nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
for _, capability := range capabilities {
|
|
||||||
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
|
|
||||||
stageUnstageSet = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stageUnstageSet, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
|
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
|
||||||
@ -344,6 +296,34 @@ func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResi
|
|||||||
return *updatedQuantity, nil
|
return *updatedQuantity, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
|
||||||
|
c.t.Log("calling fake.nodeSupportsVolumeCondition...")
|
||||||
|
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
|
||||||
|
capabilities, err := c.nodeGetCapabilities(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, capability := range capabilities {
|
||||||
|
if capability.GetRpc().GetType() == capabilityType {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeCsiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
|
||||||
|
req := &csipbv1.NodeGetCapabilitiesRequest{}
|
||||||
|
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return []*csipbv1.NodeServiceCapability{}, err
|
||||||
|
}
|
||||||
|
return resp.GetCapabilities(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
|
func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
|
||||||
return newFakeCsiDriverClient(t, stageUnstageSet)
|
return newFakeCsiDriverClient(t, stageUnstageSet)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user