diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 9f0c0d9c968..a6c59dac5e6 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -7,6 +7,7 @@ go_library( "csi_block.go", "csi_client.go", "csi_drivers_store.go", + "csi_metrics.go", "csi_mounter.go", "csi_plugin.go", "csi_util.go", @@ -48,6 +49,7 @@ go_test( "csi_block_test.go", "csi_client_test.go", "csi_drivers_store_test.go", + "csi_metrics_test.go", "csi_mounter_test.go", "csi_plugin_test.go", "csi_test.go", diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index bc4314834e8..55904193e41 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -35,6 +35,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" csipbv0 "k8s.io/kubernetes/pkg/volume/csi/csiv0" ) @@ -72,9 +73,16 @@ type csiClient interface { secrets map[string]string, volumeContext map[string]string, ) error + + NodeGetVolumeStats( + ctx context.Context, + volID string, + targetPath string, + ) (*volume.Metrics, error) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error NodeSupportsStageUnstage(ctx context.Context) (bool, error) NodeSupportsNodeExpand(ctx context.Context) (bool, error) + NodeSupportsVolumeStats(ctx context.Context) (bool, error) } // Strongly typed address @@ -840,3 +848,101 @@ func (c *csiClientGetter) Get() (csiClient, error) { c.csiClient = csi return c.csiClient, nil } + +func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) { + klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats")) + if c.nodeV1ClientCreator != nil { + return c.nodeSupportsVolumeStatsV1(ctx) + } + return false, fmt.Errorf("failed to call NodeSupportsVolumeStats. nodeV1ClientCreator is nil") +} + +func (c *csiDriverClient) nodeSupportsVolumeStatsV1(ctx context.Context) (bool, error) { + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + if err != nil { + return false, err + } + defer closer.Close() + req := &csipbv1.NodeGetCapabilitiesRequest{} + resp, err := nodeClient.NodeGetCapabilities(ctx, req) + if err != nil { + return false, err + } + capabilities := resp.GetCapabilities() + if capabilities == nil { + return false, nil + } + for _, capability := range capabilities { + if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS { + return true, nil + } + } + return false, nil +} + +func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) { + klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath)) + if volID == "" { + return nil, errors.New("missing volume id") + } + if targetPath == "" { + return nil, errors.New("missing target path") + } + + if c.nodeV1ClientCreator != nil { + return c.nodeGetVolumeStatsV1(ctx, volID, targetPath) + } + + return nil, fmt.Errorf("failed to call NodeGetVolumeStats. nodeV1ClientCreator is nil") +} + +func (c *csiDriverClient) nodeGetVolumeStatsV1( + ctx context.Context, + volID string, + targetPath string, +) (*volume.Metrics, error) { + nodeClient, closer, err := c.nodeV1ClientCreator(c.addr) + if err != nil { + return nil, err + } + defer closer.Close() + + req := &csipbv1.NodeGetVolumeStatsRequest{ + VolumeId: volID, + VolumePath: targetPath, + } + + resp, err := nodeClient.NodeGetVolumeStats(ctx, req) + if err != nil { + return nil, err + } + usages := resp.GetUsage() + if usages == nil { + return nil, fmt.Errorf("failed to get usage from response. usage is nil") + } + metrics := &volume.Metrics{ + Used: resource.NewQuantity(int64(0), resource.BinarySI), + Capacity: resource.NewQuantity(int64(0), resource.BinarySI), + Available: resource.NewQuantity(int64(0), resource.BinarySI), + InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI), + Inodes: resource.NewQuantity(int64(0), resource.BinarySI), + InodesFree: resource.NewQuantity(int64(0), resource.BinarySI), + } + for _, usage := range usages { + unit := usage.GetUnit() + switch unit { + case csipbv1.VolumeUsage_BYTES: + metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) + metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) + metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) + case csipbv1.VolumeUsage_INODES: + metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) + metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) + metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) + default: + klog.Errorf("unknown key %s in usage", unit.String()) + } + + } + return metrics, nil +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index af8fe63f5d2..b53fa24e583 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -26,6 +26,7 @@ import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" ) @@ -48,6 +49,13 @@ func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expa } } +func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *fakeCsiDriverClient { + return &fakeCsiDriverClient{ + t: t, + nodeClient: fake.NewNodeClientWithVolumeStats(volumeStatsSet), + } +} + func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) ( nodeID string, maxVolumePerNode int64, @@ -61,6 +69,54 @@ func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) ( return resp.GetNodeId(), resp.GetMaxVolumesPerNode(), accessibleTopology, err } +func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) ( + usageCountMap *volume.Metrics, err error) { + c.t.Log("calling fake.NodeGetVolumeStats...") + req := &csipbv1.NodeGetVolumeStatsRequest{ + VolumeId: volID, + VolumePath: targetPath, + } + resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req) + usages := resp.GetUsage() + metrics := &volume.Metrics{} + if usages == nil { + return nil, nil + } + for _, usage := range usages { + unit := usage.GetUnit() + switch unit { + case csipbv1.VolumeUsage_BYTES: + metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) + metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) + metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) + case csipbv1.VolumeUsage_INODES: + metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) + metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) + metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) + } + } + return metrics, nil +} + +func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) { + c.t.Log("calling fake.NodeSupportsVolumeStats...") + req := &csipbv1.NodeGetCapabilitiesRequest{} + resp, err := c.nodeClient.NodeGetCapabilities(ctx, req) + if err != nil { + return false, err + } + capabilities := resp.GetCapabilities() + if capabilities == nil { + return false, nil + } + for _, capability := range capabilities { + if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS { + return true, nil + } + } + return false, nil +} + func (c *fakeCsiDriverClient) NodePublishVolume( ctx context.Context, volID string, @@ -220,6 +276,10 @@ func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet b return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet) } +func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient { + return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet) +} + func checkErr(t *testing.T, expectedAnError bool, actualError error) { t.Helper() @@ -521,3 +581,90 @@ func TestNodeExpandVolume(t *testing.T) { } } } + +type VolumeStatsOptions struct { + VolumeSpec *volume.Spec + + // this just could be volumeID + VolumeID string + + // DeviceMountPath location where device is mounted on the node. If volume type + // is attachable - this would be global mount path otherwise + // it would be location where volume was mounted for the pod + DeviceMountPath string +} + +func TestVolumeStats(t *testing.T) { + spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false) + tests := []struct { + name string + volumeStatsSet bool + volumeData VolumeStatsOptions + success bool + }{ + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on", + volumeStatsSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "/foo/bar", + }, + success: true, + }, + + { + name: "when nodeVolumeStats=off, VolumeID=on, DeviceMountPath=on", + volumeStatsSet: false, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "/foo/bar", + }, + success: false, + }, + + { + name: "when nodeVolumeStats=on, VolumeID=off, DeviceMountPath=on", + volumeStatsSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "", + DeviceMountPath: "/foo/bar", + }, + success: false, + }, + + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off", + volumeStatsSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "volume1", + DeviceMountPath: "", + }, + success: false, + }, + { + name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off", + volumeStatsSet: true, + volumeData: VolumeStatsOptions{ + VolumeSpec: spec, + VolumeID: "", + DeviceMountPath: "", + }, + success: false, + }, + } + for _, tc := range tests { + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec) + csClient := setupClientWithVolumeStats(t, tc.volumeStatsSet) + _, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath) + if err != nil && tc.success { + t.Errorf("For %s : expected %v got %v", tc.name, tc.success, err) + } + } + +} diff --git a/pkg/volume/csi/csi_metrics.go b/pkg/volume/csi/csi_metrics.go new file mode 100644 index 00000000000..bd7ed55305d --- /dev/null +++ b/pkg/volume/csi/csi_metrics.go @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/volume" +) + +var _ volume.MetricsProvider = &metricsCsi{} + +// metricsCsi represents a MetricsProvider that calculates the used,free and +// capacity information for volume using volume path. + +type metricsCsi struct { + // the directory path the volume is mounted to. + targetPath string + + // Volume handle or id + volumeID string + + //csiClient with cache + csiClientGetter +} + +// NewMetricsCsi creates a new metricsCsi with the Volume ID and path. +func NewMetricsCsi(volumeID string, targetPath string, driverName csiDriverName) volume.MetricsProvider { + mc := &metricsCsi{volumeID: volumeID, targetPath: targetPath} + mc.csiClientGetter.driverName = driverName + return mc +} + +func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) { + currentTime := metav1.Now() + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + // Get CSI client + csiClient, err := mc.csiClientGetter.Get() + if err != nil { + return nil, err + } + // Check whether "GET_VOLUME_STATS" is set + volumeStatsSet, err := csiClient.NodeSupportsVolumeStats(ctx) + if err != nil { + return nil, err + } + // if plugin doesnot support volume status, return. + if !volumeStatsSet { + return nil, nil + } + // Get Volumestatus + metrics, err := csiClient.NodeGetVolumeStats(ctx, mc.volumeID, mc.targetPath) + if err != nil { + return nil, err + } + if metrics == nil { + return nil, fmt.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", mc.volumeID) + } + //set recorded time + metrics.Time = currentTime + return metrics, nil +} diff --git a/pkg/volume/csi/csi_metrics_test.go b/pkg/volume/csi/csi_metrics_test.go new file mode 100644 index 00000000000..bcdb352e90f --- /dev/null +++ b/pkg/volume/csi/csi_metrics_test.go @@ -0,0 +1,113 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + "io" + "testing" + + csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/volume/csi/fake" +) + +func TestGetMetrics(t *testing.T) { + tests := []struct { + name string + volumeID string + targetPath string + expectSuccess bool + }{ + { + name: "with valid name and volume id", + expectSuccess: true, + volumeID: "foobar", + targetPath: "/mnt/foo", + }, + } + + for _, tc := range tests { + metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath} + metricsGetter.csiClient = &csiDriverClient{ + driverName: "com.google.gcepd", + nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) { + nodeClient := fake.NewNodeClientWithVolumeStats(true /* VolumeStatsCapable */) + fakeCloser := fake.NewCloser(t) + nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo()) + return nodeClient, fakeCloser, nil + }, + } + metrics, err := metricsGetter.GetMetrics() + if err != nil { + t.Fatalf("for %s: unexpected error : %v", tc.name, err) + } + if metrics == nil { + t.Fatalf("unexpected nil metrics") + } + expectedMetrics := getRawVolumeInfo() + for _, usage := range expectedMetrics.Usage { + if usage.Unit == csipbv1.VolumeUsage_BYTES { + availableBytes := resource.NewQuantity(usage.Available, resource.BinarySI) + totalBytes := resource.NewQuantity(usage.Total, resource.BinarySI) + usedBytes := resource.NewQuantity(usage.Used, resource.BinarySI) + if metrics.Available.Cmp(*availableBytes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *availableBytes, *(metrics.Available)) + } + if metrics.Capacity.Cmp(*totalBytes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *totalBytes, *(metrics.Capacity)) + } + if metrics.Used.Cmp(*usedBytes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *usedBytes, *(metrics.Used)) + } + } + + if usage.Unit == csipbv1.VolumeUsage_INODES { + freeInodes := resource.NewQuantity(usage.Available, resource.BinarySI) + inodes := resource.NewQuantity(usage.Total, resource.BinarySI) + usedInodes := resource.NewQuantity(usage.Used, resource.BinarySI) + if metrics.InodesFree.Cmp(*freeInodes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *freeInodes, *(metrics.InodesFree)) + } + if metrics.Inodes.Cmp(*inodes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *inodes, *(metrics.Inodes)) + } + if metrics.InodesUsed.Cmp(*usedInodes) != 0 { + t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *usedInodes, *(metrics.InodesUsed)) + } + } + } + } +} + +func getRawVolumeInfo() *csipbv1.NodeGetVolumeStatsResponse { + return &csipbv1.NodeGetVolumeStatsResponse{ + Usage: []*csipbv1.VolumeUsage{ + { + Available: int64(10), + Total: int64(10), + Used: int64(2), + Unit: csipbv1.VolumeUsage_BYTES, + }, + { + Available: int64(100), + Total: int64(100), + Used: int64(20), + Unit: csipbv1.VolumeUsage_INODES, + }, + }, + } +} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 8c10d1f644c..20793fa82f5 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -69,7 +69,7 @@ type csiMountMgr struct { podUID types.UID options volume.VolumeOptions publishContext map[string]string - volume.MetricsNil + volume.MetricsProvider } // volume.Volume methods diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index ac50ac1c427..0e323f1d18d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -415,6 +415,8 @@ func (p *csiPlugin) NewMounter( } klog.V(4).Info(log("created path successfully [%s]", dataDir)) + mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName)) + // persist volume info data for teardown node := string(p.host.GetNodeName()) volData := map[string]string{ diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index ac826aa1af3..1f3d0b5cdb0 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -71,7 +71,9 @@ type NodeClient struct { nodeStagedVolumes map[string]CSIVolume stageUnstageSet bool expansionSet bool + volumeStatsSet bool nodeGetInfoResp *csipb.NodeGetInfoResponse + nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse nextErr error } @@ -81,6 +83,7 @@ func NewNodeClient(stageUnstageSet bool) *NodeClient { nodePublishedVolumes: make(map[string]CSIVolume), nodeStagedVolumes: make(map[string]CSIVolume), stageUnstageSet: stageUnstageSet, + volumeStatsSet: true, } } @@ -93,6 +96,12 @@ func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeCl } } +func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient { + return &NodeClient{ + volumeStatsSet: volumeStatsSet, + } +} + // SetNextError injects next expected error func (f *NodeClient) SetNextError(err error) { f.nextErr = err @@ -102,6 +111,10 @@ func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) { f.nodeGetInfoResp = resp } +func (f *NodeClient) SetNodeVolumeStatsResp(resp *csipb.NodeGetVolumeStatsResponse) { + f.nodeVolumeStatsResp = resp +} + // GetNodePublishedVolumes returns node published volumes func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume { return f.nodePublishedVolumes @@ -264,13 +277,42 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC }, }) } + + if f.volumeStatsSet { + resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{ + Type: &csipb.NodeServiceCapability_Rpc{ + Rpc: &csipb.NodeServiceCapability_RPC{ + Type: csipb.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }) + } return resp, nil } +/* // NodeGetVolumeStats implements csi method func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, in *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) { return nil, nil } +*/ + +// NodeGetVolumeStats implements csi method +func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, req *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) { + if f.nextErr != nil { + return nil, f.nextErr + } + if req.GetVolumeId() == "" { + return nil, errors.New("missing volume id") + } + if req.GetVolumePath() == "" { + return nil, errors.New("missing Volume path") + } + if f.nodeVolumeStatsResp != nil { + return f.nodeVolumeStatsResp, nil + } + return &csipb.NodeGetVolumeStatsResponse{}, nil +} // ControllerClient represents a CSI Controller client type ControllerClient struct {