csi: Implement NodeServiceCapability_RPC_GET_VOLUME_STATS rpc call

and implement Metrics Provider for CSI driver

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2019-05-14 20:29:45 +05:30
parent ca1e47065b
commit c511c90b59
8 changed files with 492 additions and 1 deletions

View File

@ -7,6 +7,7 @@ go_library(
"csi_block.go",
"csi_client.go",
"csi_drivers_store.go",
"csi_metrics.go",
"csi_mounter.go",
"csi_plugin.go",
"csi_util.go",
@ -50,6 +51,7 @@ go_test(
"csi_block_test.go",
"csi_client_test.go",
"csi_drivers_store_test.go",
"csi_metrics_test.go",
"csi_mounter_test.go",
"csi_plugin_test.go",
"expander_test.go",

View File

@ -35,6 +35,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
csipbv0 "k8s.io/kubernetes/pkg/volume/csi/csiv0"
)
@ -72,9 +73,16 @@ type csiClient interface {
secrets map[string]string,
volumeContext map[string]string,
) error
NodeGetVolumeStats(
ctx context.Context,
volID string,
targetPath string,
) (*volume.Metrics, error)
NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
NodeSupportsNodeExpand(ctx context.Context) (bool, error)
NodeSupportsVolumeStats(ctx context.Context) (bool, error)
}
// Strongly typed address
@ -841,3 +849,101 @@ func (c *csiClientGetter) Get() (csiClient, error) {
c.csiClient = csi
return c.csiClient, nil
}
func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
if c.nodeV1ClientCreator != nil {
return c.nodeSupportsVolumeStatsV1(ctx)
}
return false, fmt.Errorf("failed to call NodeSupportsVolumeStats. nodeV1ClientCreator is nil")
}
func (c *csiDriverClient) nodeSupportsVolumeStatsV1(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
return true, nil
}
}
return false, nil
}
func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
if volID == "" {
return nil, errors.New("missing volume id")
}
if targetPath == "" {
return nil, errors.New("missing target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodeGetVolumeStatsV1(ctx, volID, targetPath)
}
return nil, fmt.Errorf("failed to call NodeGetVolumeStats. nodeV1ClientCreator is nil")
}
func (c *csiDriverClient) nodeGetVolumeStatsV1(
ctx context.Context,
volID string,
targetPath string,
) (*volume.Metrics, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return nil, err
}
defer closer.Close()
req := &csipbv1.NodeGetVolumeStatsRequest{
VolumeId: volID,
VolumePath: targetPath,
}
resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
if err != nil {
return nil, err
}
usages := resp.GetUsage()
if usages == nil {
return nil, fmt.Errorf("failed to get usage from response. usage is nil")
}
metrics := &volume.Metrics{
Used: resource.NewQuantity(int64(0), resource.BinarySI),
Capacity: resource.NewQuantity(int64(0), resource.BinarySI),
Available: resource.NewQuantity(int64(0), resource.BinarySI),
InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
Inodes: resource.NewQuantity(int64(0), resource.BinarySI),
InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
}
for _, usage := range usages {
unit := usage.GetUnit()
switch unit {
case csipbv1.VolumeUsage_BYTES:
metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
case csipbv1.VolumeUsage_INODES:
metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
default:
klog.Errorf("unknown key %s in usage", unit.String())
}
}
return metrics, nil
}

View File

@ -26,6 +26,7 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
@ -48,6 +49,13 @@ func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expa
}
}
func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *fakeCsiDriverClient {
return &fakeCsiDriverClient{
t: t,
nodeClient: fake.NewNodeClientWithVolumeStats(volumeStatsSet),
}
}
func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
@ -61,6 +69,54 @@ func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
return resp.GetNodeId(), resp.GetMaxVolumesPerNode(), accessibleTopology, err
}
func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (
usageCountMap *volume.Metrics, err error) {
c.t.Log("calling fake.NodeGetVolumeStats...")
req := &csipbv1.NodeGetVolumeStatsRequest{
VolumeId: volID,
VolumePath: targetPath,
}
resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req)
usages := resp.GetUsage()
metrics := &volume.Metrics{}
if usages == nil {
return nil, nil
}
for _, usage := range usages {
unit := usage.GetUnit()
switch unit {
case csipbv1.VolumeUsage_BYTES:
metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
case csipbv1.VolumeUsage_INODES:
metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
}
}
return metrics, nil
}
func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeSupportsVolumeStats...")
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS {
return true, nil
}
}
return false, nil
}
func (c *fakeCsiDriverClient) NodePublishVolume(
ctx context.Context,
volID string,
@ -220,6 +276,10 @@ func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet b
return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
}
func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient {
return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet)
}
func checkErr(t *testing.T, expectedAnError bool, actualError error) {
t.Helper()
@ -521,3 +581,90 @@ func TestNodeExpandVolume(t *testing.T) {
}
}
}
type VolumeStatsOptions struct {
VolumeSpec *volume.Spec
// this just could be volumeID
VolumeID string
// DeviceMountPath location where device is mounted on the node. If volume type
// is attachable - this would be global mount path otherwise
// it would be location where volume was mounted for the pod
DeviceMountPath string
}
func TestVolumeStats(t *testing.T) {
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
tests := []struct {
name string
volumeStatsSet bool
volumeData VolumeStatsOptions
success bool
}{
{
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on",
volumeStatsSet: true,
volumeData: VolumeStatsOptions{
VolumeSpec: spec,
VolumeID: "volume1",
DeviceMountPath: "/foo/bar",
},
success: true,
},
{
name: "when nodeVolumeStats=off, VolumeID=on, DeviceMountPath=on",
volumeStatsSet: false,
volumeData: VolumeStatsOptions{
VolumeSpec: spec,
VolumeID: "volume1",
DeviceMountPath: "/foo/bar",
},
success: false,
},
{
name: "when nodeVolumeStats=on, VolumeID=off, DeviceMountPath=on",
volumeStatsSet: true,
volumeData: VolumeStatsOptions{
VolumeSpec: spec,
VolumeID: "",
DeviceMountPath: "/foo/bar",
},
success: false,
},
{
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
volumeStatsSet: true,
volumeData: VolumeStatsOptions{
VolumeSpec: spec,
VolumeID: "volume1",
DeviceMountPath: "",
},
success: false,
},
{
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
volumeStatsSet: true,
volumeData: VolumeStatsOptions{
VolumeSpec: spec,
VolumeID: "",
DeviceMountPath: "",
},
success: false,
},
}
for _, tc := range tests {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
csClient := setupClientWithVolumeStats(t, tc.volumeStatsSet)
_, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
if err != nil && tc.success {
t.Errorf("For %s : expected %v got %v", tc.name, tc.success, err)
}
}
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/volume"
)
var _ volume.MetricsProvider = &metricsCsi{}
// metricsCsi represents a MetricsProvider that calculates the used,free and
// capacity information for volume using volume path.
type metricsCsi struct {
// the directory path the volume is mounted to.
targetPath string
// Volume handle or id
volumeID string
//csiClient with cache
csiClientGetter
}
// NewMetricsCsi creates a new metricsCsi with the Volume ID and path.
func NewMetricsCsi(volumeID string, targetPath string, driverName csiDriverName) volume.MetricsProvider {
mc := &metricsCsi{volumeID: volumeID, targetPath: targetPath}
mc.csiClientGetter.driverName = driverName
return mc
}
func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) {
currentTime := metav1.Now()
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Get CSI client
csiClient, err := mc.csiClientGetter.Get()
if err != nil {
return nil, err
}
// Check whether "GET_VOLUME_STATS" is set
volumeStatsSet, err := csiClient.NodeSupportsVolumeStats(ctx)
if err != nil {
return nil, err
}
// if plugin doesnot support volume status, return.
if !volumeStatsSet {
return nil, nil
}
// Get Volumestatus
metrics, err := csiClient.NodeGetVolumeStats(ctx, mc.volumeID, mc.targetPath)
if err != nil {
return nil, err
}
if metrics == nil {
return nil, fmt.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", mc.volumeID)
}
//set recorded time
metrics.Time = currentTime
return metrics, nil
}

View File

@ -0,0 +1,113 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"io"
"testing"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)
func TestGetMetrics(t *testing.T) {
tests := []struct {
name string
volumeID string
targetPath string
expectSuccess bool
}{
{
name: "with valid name and volume id",
expectSuccess: true,
volumeID: "foobar",
targetPath: "/mnt/foo",
},
}
for _, tc := range tests {
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
metricsGetter.csiClient = &csiDriverClient{
driverName: "com.google.gcepd",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithVolumeStats(true /* VolumeStatsCapable */)
fakeCloser := fake.NewCloser(t)
nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
return nodeClient, fakeCloser, nil
},
}
metrics, err := metricsGetter.GetMetrics()
if err != nil {
t.Fatalf("for %s: unexpected error : %v", tc.name, err)
}
if metrics == nil {
t.Fatalf("unexpected nil metrics")
}
expectedMetrics := getRawVolumeInfo()
for _, usage := range expectedMetrics.Usage {
if usage.Unit == csipbv1.VolumeUsage_BYTES {
availableBytes := resource.NewQuantity(usage.Available, resource.BinarySI)
totalBytes := resource.NewQuantity(usage.Total, resource.BinarySI)
usedBytes := resource.NewQuantity(usage.Used, resource.BinarySI)
if metrics.Available.Cmp(*availableBytes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *availableBytes, *(metrics.Available))
}
if metrics.Capacity.Cmp(*totalBytes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *totalBytes, *(metrics.Capacity))
}
if metrics.Used.Cmp(*usedBytes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *usedBytes, *(metrics.Used))
}
}
if usage.Unit == csipbv1.VolumeUsage_INODES {
freeInodes := resource.NewQuantity(usage.Available, resource.BinarySI)
inodes := resource.NewQuantity(usage.Total, resource.BinarySI)
usedInodes := resource.NewQuantity(usage.Used, resource.BinarySI)
if metrics.InodesFree.Cmp(*freeInodes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *freeInodes, *(metrics.InodesFree))
}
if metrics.Inodes.Cmp(*inodes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *inodes, *(metrics.Inodes))
}
if metrics.InodesUsed.Cmp(*usedInodes) != 0 {
t.Fatalf("for %s: error: expected :%v , got: %v", tc.name, *usedInodes, *(metrics.InodesUsed))
}
}
}
}
}
func getRawVolumeInfo() *csipbv1.NodeGetVolumeStatsResponse {
return &csipbv1.NodeGetVolumeStatsResponse{
Usage: []*csipbv1.VolumeUsage{
{
Available: int64(10),
Total: int64(10),
Used: int64(2),
Unit: csipbv1.VolumeUsage_BYTES,
},
{
Available: int64(100),
Total: int64(100),
Used: int64(20),
Unit: csipbv1.VolumeUsage_INODES,
},
},
}
}

View File

@ -69,7 +69,7 @@ type csiMountMgr struct {
podUID types.UID
options volume.VolumeOptions
publishContext map[string]string
volume.MetricsNil
volume.MetricsProvider
}
// volume.Volume methods

View File

@ -408,6 +408,8 @@ func (p *csiPlugin) NewMounter(
}
klog.V(4).Info(log("created path successfully [%s]", dataDir))
mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
// persist volume info data for teardown
node := string(p.host.GetNodeName())
volData := map[string]string{

View File

@ -71,7 +71,9 @@ type NodeClient struct {
nodeStagedVolumes map[string]CSIVolume
stageUnstageSet bool
expansionSet bool
volumeStatsSet bool
nodeGetInfoResp *csipb.NodeGetInfoResponse
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
nextErr error
}
@ -81,6 +83,7 @@ func NewNodeClient(stageUnstageSet bool) *NodeClient {
nodePublishedVolumes: make(map[string]CSIVolume),
nodeStagedVolumes: make(map[string]CSIVolume),
stageUnstageSet: stageUnstageSet,
volumeStatsSet: true,
}
}
@ -93,6 +96,12 @@ func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeCl
}
}
func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient {
return &NodeClient{
volumeStatsSet: volumeStatsSet,
}
}
// SetNextError injects next expected error
func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
@ -102,6 +111,10 @@ func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
f.nodeGetInfoResp = resp
}
func (f *NodeClient) SetNodeVolumeStatsResp(resp *csipb.NodeGetVolumeStatsResponse) {
f.nodeVolumeStatsResp = resp
}
// GetNodePublishedVolumes returns node published volumes
func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
return f.nodePublishedVolumes
@ -264,13 +277,42 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC
},
})
}
if f.volumeStatsSet {
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
Type: &csipb.NodeServiceCapability_Rpc{
Rpc: &csipb.NodeServiceCapability_RPC{
Type: csipb.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
})
}
return resp, nil
}
/*
// NodeGetVolumeStats implements csi method
func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, in *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
return nil, nil
}
*/
// NodeGetVolumeStats implements csi method
func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, req *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
if f.nextErr != nil {
return nil, f.nextErr
}
if req.GetVolumeId() == "" {
return nil, errors.New("missing volume id")
}
if req.GetVolumePath() == "" {
return nil, errors.New("missing Volume path")
}
if f.nodeVolumeStatsResp != nil {
return f.nodeVolumeStatsResp, nil
}
return &csipb.NodeGetVolumeStatsResponse{}, nil
}
// ControllerClient represents a CSI Controller client
type ControllerClient struct {