mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #101587 from nixpanic/in-tree/block-metrics
Fix a panic for in-tree drivers that partialy support Block volume metrics
This commit is contained in:
commit
f545438bd3
@ -567,6 +567,10 @@ func (f *stubBlockVolume) UnmapPodDevice() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *stubBlockVolume) SupportsMetrics() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *stubBlockVolume) GetMetrics() (*volume.Metrics, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -112,7 +112,12 @@ func (s *volumeStatCalculator) calcAndStoreStats() {
|
||||
for name, v := range blockVolumes {
|
||||
// Only add the blockVolume if it implements the MetricsProvider interface
|
||||
if _, ok := v.(volume.MetricsProvider); ok {
|
||||
metricVolumes[name] = v
|
||||
// Some drivers inherit the MetricsProvider interface from Filesystem
|
||||
// mode volumes, but do not implement it for Block mode. Checking
|
||||
// SupportsMetrics() will prevent panics in that case.
|
||||
if v.SupportsMetrics() {
|
||||
metricVolumes[name] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -246,6 +246,8 @@ func (v *fakeBlockVolume) GetGlobalMapPath(*volume.Spec) (string, error) { retur
|
||||
|
||||
func (v *fakeBlockVolume) GetPodDeviceMapPath() (string, string) { return "", "" }
|
||||
|
||||
func (v *fakeBlockVolume) SupportsMetrics() bool { return true }
|
||||
|
||||
func (v *fakeBlockVolume) GetMetrics() (*volume.Metrics, error) {
|
||||
return expectedBlockMetrics(), nil
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ func (plugin *awsElasticBlockStorePlugin) newBlockVolumeMapperInternal(spec *vol
|
||||
partition = strconv.Itoa(int(ebs.Partition))
|
||||
}
|
||||
|
||||
return &awsElasticBlockStoreMapper{
|
||||
mapper := &awsElasticBlockStoreMapper{
|
||||
awsElasticBlockStore: &awsElasticBlockStore{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
@ -108,7 +108,16 @@ func (plugin *awsElasticBlockStorePlugin) newBlockVolumeMapperInternal(spec *vol
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
readOnly: readOnly}, nil
|
||||
readOnly: readOnly,
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -156,3 +165,9 @@ func (ebs *awsElasticBlockStore) GetPodDeviceMapPath() (string, string) {
|
||||
name := awsElasticBlockStorePluginName
|
||||
return ebs.plugin.host.GetPodVolumeDeviceDir(ebs.podUID, utilstrings.EscapeQualifiedName(name)), ebs.volName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for awsElasticBlockStore as it initializes the
|
||||
// MetricsProvider.
|
||||
func (ebs *awsElasticBlockStore) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -104,10 +104,18 @@ func (plugin *azureDataDiskPlugin) newBlockVolumeMapperInternal(spec *volume.Spe
|
||||
|
||||
disk := makeDataDisk(spec.Name(), podUID, volumeSource.DiskName, plugin.host, plugin)
|
||||
|
||||
return &azureDataDiskMapper{
|
||||
mapper := &azureDataDiskMapper{
|
||||
dataDisk: disk,
|
||||
readOnly: readOnly,
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *azureDataDiskPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -121,6 +129,7 @@ func (plugin *azureDataDiskPlugin) newUnmapperInternal(volName string, podUID ty
|
||||
|
||||
type azureDataDiskUnmapper struct {
|
||||
*dataDisk
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeUnmapper = &azureDataDiskUnmapper{}
|
||||
@ -149,3 +158,9 @@ func (disk *dataDisk) GetPodDeviceMapPath() (string, string) {
|
||||
name := azureDataDiskPluginName
|
||||
return disk.plugin.host.GetPodVolumeDeviceDir(disk.podUID, utilstrings.EscapeQualifiedName(name)), disk.volumeName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for azureDataDiskMapper as it initializes the
|
||||
// MetricsProvider.
|
||||
func (addm *azureDataDiskMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func (plugin *cinderPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podU
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &cinderVolumeMapper{
|
||||
mapper := &cinderVolumeMapper{
|
||||
cinderVolume: &cinderVolume{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
@ -111,7 +111,16 @@ func (plugin *cinderPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podU
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
readOnly: readOnly}, nil
|
||||
readOnly: readOnly,
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *cinderPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -131,6 +140,7 @@ func (plugin *cinderPlugin) newUnmapperInternal(volName string, podUID types.UID
|
||||
|
||||
type cinderPluginUnmapper struct {
|
||||
*cinderVolume
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeUnmapper = &cinderPluginUnmapper{}
|
||||
@ -159,3 +169,9 @@ func (cd *cinderVolume) GetPodDeviceMapPath() (string, string) {
|
||||
name := cinderVolumePluginName
|
||||
return cd.plugin.host.GetPodVolumeDeviceDir(cd.podUID, utilstrings.EscapeQualifiedName(name)), cd.volName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for cinderVolumeMapper as it initializes the
|
||||
// MetricsProvider.
|
||||
func (cvm *cinderVolumeMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -115,6 +115,12 @@ func (m *csiBlockMapper) GetStagingPath() string {
|
||||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for csiBlockMapper as it initializes the
|
||||
// MetricsProvider.
|
||||
func (m *csiBlockMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// getPublishDir returns path to a directory, where the volume is published to each pod.
|
||||
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}
|
||||
func (m *csiBlockMapper) getPublishDir() string {
|
||||
|
@ -171,7 +171,7 @@ func (plugin *fcPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID t
|
||||
return nil, fmt.Errorf("fc: no fc disk information found. failed to make a new mapper")
|
||||
}
|
||||
|
||||
return &fcDiskMapper{
|
||||
mapper := &fcDiskMapper{
|
||||
fcDisk: &fcDisk{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
@ -184,7 +184,15 @@ func (plugin *fcPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID t
|
||||
readOnly: readOnly,
|
||||
mounter: &mount.SafeFormatAndMount{Interface: mounter, Exec: exec},
|
||||
deviceUtil: util.NewDeviceHandler(util.NewIOHandler()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *fcPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
@ -393,6 +401,7 @@ func (c *fcDiskUnmounter) TearDownAt(dir string) error {
|
||||
// Block Volumes Support
|
||||
type fcDiskMapper struct {
|
||||
*fcDisk
|
||||
volume.MetricsProvider
|
||||
readOnly bool
|
||||
mounter mount.Interface
|
||||
deviceUtil util.DeviceUtil
|
||||
|
@ -108,7 +108,7 @@ func (plugin *gcePersistentDiskPlugin) newBlockVolumeMapperInternal(spec *volume
|
||||
partition = strconv.Itoa(int(volumeSource.Partition))
|
||||
}
|
||||
|
||||
return &gcePersistentDiskMapper{
|
||||
mapper := &gcePersistentDiskMapper{
|
||||
gcePersistentDisk: &gcePersistentDisk{
|
||||
volName: spec.Name(),
|
||||
podUID: podUID,
|
||||
@ -118,7 +118,16 @@ func (plugin *gcePersistentDiskPlugin) newBlockVolumeMapperInternal(spec *volume
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
readOnly: readOnly}, nil
|
||||
readOnly: readOnly,
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *gcePersistentDiskPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -165,3 +174,9 @@ func (pd *gcePersistentDisk) GetPodDeviceMapPath() (string, string) {
|
||||
name := gcePersistentDiskPluginName
|
||||
return pd.plugin.host.GetPodVolumeDeviceDir(pd.podUID, utilstrings.EscapeQualifiedName(name)), pd.volName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for gcePersistentDisk as it initializes the
|
||||
// MetricsProvider.
|
||||
func (pd *gcePersistentDisk) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -161,12 +161,20 @@ func (plugin *iscsiPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUI
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &iscsiDiskMapper{
|
||||
mapper := &iscsiDiskMapper{
|
||||
iscsiDisk: iscsiDisk,
|
||||
readOnly: readOnly,
|
||||
exec: exec,
|
||||
deviceUtil: ioutil.NewDeviceHandler(ioutil.NewIOHandler()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *iscsiPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
|
||||
@ -385,6 +393,13 @@ type iscsiDiskUnmapper struct {
|
||||
*iscsiDisk
|
||||
exec utilexec.Interface
|
||||
deviceUtil ioutil.DeviceUtil
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for SupportsMetrics as it initializes the
|
||||
// MetricsProvider.
|
||||
func (idm *iscsiDiskMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeUnmapper = &iscsiDiskUnmapper{}
|
||||
|
@ -161,7 +161,7 @@ func (plugin *localVolumePlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &localVolumeMapper{
|
||||
mapper := &localVolumeMapper{
|
||||
localVolume: &localVolume{
|
||||
podUID: pod.UID,
|
||||
volName: spec.Name(),
|
||||
@ -169,8 +169,15 @@ func (plugin *localVolumePlugin) NewBlockVolumeMapper(spec *volume.Spec, pod *v1
|
||||
plugin: plugin,
|
||||
},
|
||||
readOnly: readOnly,
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(pod.UID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *localVolumePlugin) NewBlockVolumeUnmapper(volName string,
|
||||
@ -626,9 +633,16 @@ func (m *localVolumeMapper) GetStagingPath() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for SupportsMetrics as it initializes the
|
||||
// MetricsProvider.
|
||||
func (m *localVolumeMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// localVolumeUnmapper implements the BlockVolumeUnmapper interface for local volumes.
|
||||
type localVolumeUnmapper struct {
|
||||
*localVolume
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
var _ volume.BlockVolumeUnmapper = &localVolumeUnmapper{}
|
||||
|
87
pkg/volume/metrics_block.go
Normal file
87
pkg/volume/metrics_block.go
Normal file
@ -0,0 +1,87 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package volume
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
var _ MetricsProvider = &metricsBlock{}
|
||||
|
||||
// metricsBlock represents a MetricsProvider that detects the size of the
|
||||
// BlockMode Volume.
|
||||
type metricsBlock struct {
|
||||
// the device node where the volume is attached to.
|
||||
device string
|
||||
}
|
||||
|
||||
// NewMetricsStatfs creates a new metricsBlock with the device node of the
|
||||
// Volume.
|
||||
func NewMetricsBlock(device string) MetricsProvider {
|
||||
return &metricsBlock{device}
|
||||
}
|
||||
|
||||
// See MetricsProvider.GetMetrics
|
||||
// GetMetrics detects the size of the BlockMode volume for the device node
|
||||
// where the Volume is attached.
|
||||
//
|
||||
// Note that only the capacity of the device can be detected with standard
|
||||
// tools. Storage systems may have more information that they can provide by
|
||||
// going through specialized APIs.
|
||||
func (mb *metricsBlock) GetMetrics() (*Metrics, error) {
|
||||
// TODO: Windows does not yet support VolumeMode=Block
|
||||
if runtime.GOOS == "windows" {
|
||||
return nil, NewNotImplementedError("Windows does not support Block volumes")
|
||||
}
|
||||
|
||||
metrics := &Metrics{Time: metav1.Now()}
|
||||
if mb.device == "" {
|
||||
return metrics, NewNoPathDefinedError()
|
||||
}
|
||||
|
||||
err := mb.getBlockInfo(metrics)
|
||||
if err != nil {
|
||||
return metrics, err
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
// getBlockInfo fetches metrics.Capacity by opening the device and seeking to
|
||||
// the end.
|
||||
func (mb *metricsBlock) getBlockInfo(metrics *Metrics) error {
|
||||
dev, err := os.Open(mb.device)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to open device %q: %w", mb.device, err)
|
||||
}
|
||||
defer dev.Close()
|
||||
|
||||
end, err := dev.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to detect size of %q: %w", mb.device, err)
|
||||
}
|
||||
|
||||
metrics.Capacity = resource.NewQuantity(end, resource.BinarySI)
|
||||
|
||||
return nil
|
||||
}
|
98
pkg/volume/metrics_block_test.go
Normal file
98
pkg/volume/metrics_block_test.go
Normal file
@ -0,0 +1,98 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package volume_test
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
. "k8s.io/kubernetes/pkg/volume"
|
||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||
)
|
||||
|
||||
func TestGetMetricsBlockInvalid(t *testing.T) {
|
||||
metrics := NewMetricsBlock("")
|
||||
actual, err := metrics.GetMetrics()
|
||||
expected := &Metrics{}
|
||||
if !volumetest.MetricsEqualIgnoreTimestamp(actual, expected) {
|
||||
t.Errorf("Expected empty Metrics from uninitialized MetricsBlock, actual %v", *actual)
|
||||
}
|
||||
if err == nil {
|
||||
t.Errorf("Expected error when calling GetMetrics on uninitialized MetricsBlock, actual nil")
|
||||
}
|
||||
|
||||
metrics = NewMetricsBlock("/nonexistent/device/node")
|
||||
actual, err = metrics.GetMetrics()
|
||||
if !volumetest.MetricsEqualIgnoreTimestamp(actual, expected) {
|
||||
t.Errorf("Expected empty Metrics from incorrectly initialized MetricsBlock, actual %v", *actual)
|
||||
}
|
||||
if err == nil {
|
||||
t.Errorf("Expected error when calling GetMetrics on incorrectly initialized MetricsBlock, actual nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricsBlock(t *testing.T) {
|
||||
// FIXME: this test is Linux specific
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("Block device detection is Linux specific, no Windows support")
|
||||
}
|
||||
|
||||
// find a block device
|
||||
// get all available block devices
|
||||
// - ls /sys/block
|
||||
devices, err := os.ReadDir("/dev")
|
||||
if err != nil {
|
||||
t.Skipf("Could not read devices from /dev: %v", err)
|
||||
} else if len(devices) == 0 {
|
||||
t.Skip("No devices found")
|
||||
}
|
||||
|
||||
// for each device, check if it is available in /dev
|
||||
devNode := ""
|
||||
var stat fs.FileInfo
|
||||
for _, device := range devices {
|
||||
// if the device exists, use it, return
|
||||
devNode = "/dev/" + device.Name()
|
||||
stat, err = os.Stat(devNode)
|
||||
if err == nil {
|
||||
if stat.Mode().Type() == fs.ModeDevice {
|
||||
break
|
||||
}
|
||||
}
|
||||
// set to an empty string, so we can do validation of the last
|
||||
// device too
|
||||
devNode = ""
|
||||
}
|
||||
|
||||
// if no devices are found, or none exists in /dev, skip this part
|
||||
if devNode == "" {
|
||||
t.Skip("Could not find a block device under /dev")
|
||||
}
|
||||
|
||||
// when we get here, devNode points to an existing block device
|
||||
metrics := NewMetricsBlock(devNode)
|
||||
actual, err := metrics.GetMetrics()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when calling GetMetrics: %v", err)
|
||||
}
|
||||
|
||||
if a := actual.Capacity.Value(); a <= 0 {
|
||||
t.Errorf("Expected Capacity %d to be greater than 0.", a)
|
||||
}
|
||||
}
|
@ -35,6 +35,14 @@ func NewNotSupportedError() *MetricsError {
|
||||
}
|
||||
}
|
||||
|
||||
// NewNotImplementedError creates a new MetricsError with code NotSupported.
|
||||
func NewNotImplementedError(reason string) *MetricsError {
|
||||
return &MetricsError{
|
||||
Code: ErrCodeNotSupported,
|
||||
Msg: fmt.Sprintf("metrics support is not implemented: %s", reason),
|
||||
}
|
||||
}
|
||||
|
||||
// NewNotSupportedErrorWithDriverName creates a new MetricsError with code NotSupported.
|
||||
// driver name is added to the error message.
|
||||
func NewNotSupportedErrorWithDriverName(name string) *MetricsError {
|
||||
|
@ -23,6 +23,11 @@ var _ MetricsProvider = &MetricsNil{}
|
||||
// metrics.
|
||||
type MetricsNil struct{}
|
||||
|
||||
// SupportsMetrics returns false for the MetricsNil type.
|
||||
func (*MetricsNil) SupportsMetrics() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// GetMetrics returns an empty Metrics and an error.
|
||||
// See MetricsProvider.GetMetrics
|
||||
func (*MetricsNil) GetMetrics() (*Metrics, error) {
|
||||
|
@ -20,6 +20,14 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMetricsNilSupportsMetrics(t *testing.T) {
|
||||
metrics := &MetricsNil{}
|
||||
supported := metrics.SupportsMetrics()
|
||||
if supported {
|
||||
t.Error("Expected no support for metrics")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsNilGetCapacity(t *testing.T) {
|
||||
metrics := &MetricsNil{}
|
||||
actual, err := metrics.GetMetrics()
|
||||
|
@ -524,13 +524,21 @@ func (plugin *rbdPlugin) newBlockVolumeMapperInternal(spec *volume.Spec, podUID
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rbdDiskMapper{
|
||||
mapper := &rbdDiskMapper{
|
||||
rbd: newRBD(podUID, spec.Name(), img, pool, ro, plugin, manager),
|
||||
mon: mon,
|
||||
id: id,
|
||||
keyring: keyring,
|
||||
secret: secret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *rbdPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -930,6 +938,12 @@ func (rbd *rbd) rbdPodDeviceMapPath() (string, string) {
|
||||
return rbd.plugin.host.GetPodVolumeDeviceDir(rbd.podUID, utilstrings.EscapeQualifiedName(name)), rbd.volName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for rbdDiskMapper as it initializes the
|
||||
// MetricsProvider.
|
||||
func (rdm *rbdDiskMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type rbdDiskUnmapper struct {
|
||||
*rbdDiskMapper
|
||||
}
|
||||
|
@ -49,6 +49,10 @@ type BlockVolume interface {
|
||||
// ex. pods/{podUid}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/, {volumeName}
|
||||
GetPodDeviceMapPath() (string, string)
|
||||
|
||||
// SupportsMetrics should return true if the MetricsProvider is
|
||||
// initialized
|
||||
SupportsMetrics() bool
|
||||
|
||||
// MetricsProvider embeds methods for exposing metrics (e.g.
|
||||
// used, available space).
|
||||
MetricsProvider
|
||||
|
@ -97,7 +97,7 @@ func (plugin *vsphereVolumePlugin) newBlockVolumeMapperInternal(spec *volume.Spe
|
||||
return nil, err
|
||||
}
|
||||
volPath := volumeSource.VolumePath
|
||||
return &vsphereBlockVolumeMapper{
|
||||
mapper := &vsphereBlockVolumeMapper{
|
||||
vsphereVolume: &vsphereVolume{
|
||||
volName: spec.Name(),
|
||||
podUID: podUID,
|
||||
@ -107,8 +107,15 @@ func (plugin *vsphereVolumePlugin) newBlockVolumeMapperInternal(spec *volume.Spe
|
||||
plugin: plugin,
|
||||
MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
blockPath, err := mapper.GetGlobalMapPath(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get device path: %v", err)
|
||||
}
|
||||
mapper.MetricsProvider = volume.NewMetricsBlock(filepath.Join(blockPath, string(podUID)))
|
||||
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (plugin *vsphereVolumePlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
|
||||
@ -137,6 +144,7 @@ var _ volume.BlockVolumeUnmapper = &vsphereBlockVolumeUnmapper{}
|
||||
|
||||
type vsphereBlockVolumeUnmapper struct {
|
||||
*vsphereVolume
|
||||
volume.MetricsNil
|
||||
}
|
||||
|
||||
// GetGlobalMapPath returns global map path and error
|
||||
@ -152,3 +160,9 @@ func (v *vsphereVolume) GetGlobalMapPath(spec *volume.Spec) (string, error) {
|
||||
func (v *vsphereVolume) GetPodDeviceMapPath() (string, string) {
|
||||
return v.plugin.host.GetPodVolumeDeviceDir(v.podUID, utilstrings.EscapeQualifiedName(vsphereVolumePluginName)), v.volName
|
||||
}
|
||||
|
||||
// SupportsMetrics returns true for vsphereBlockVolumeMapper as it initializes the
|
||||
// MetricsProvider.
|
||||
func (vbvm *vsphereBlockVolumeMapper) SupportsMetrics() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -47,6 +47,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
|
||||
c clientset.Interface
|
||||
ns string
|
||||
pvc *v1.PersistentVolumeClaim
|
||||
pvcBlock *v1.PersistentVolumeClaim
|
||||
metricsGrabber *e2emetrics.Grabber
|
||||
invalidSc *storagev1.StorageClass
|
||||
defaultScName string
|
||||
@ -67,9 +68,17 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
|
||||
ClaimSize: "2Gi",
|
||||
}
|
||||
|
||||
fsMode := v1.PersistentVolumeFilesystem
|
||||
pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
|
||||
ClaimSize: test.ClaimSize,
|
||||
VolumeMode: &test.VolumeMode,
|
||||
VolumeMode: &fsMode,
|
||||
}, ns)
|
||||
|
||||
// selected providers all support PersistentVolumeBlock
|
||||
blockMode := v1.PersistentVolumeBlock
|
||||
pvcBlock = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
|
||||
ClaimSize: test.ClaimSize,
|
||||
VolumeMode: &blockMode,
|
||||
}, ns)
|
||||
|
||||
metricsGrabber, err = e2emetrics.NewMetricsGrabber(c, nil, true, false, true, false, false, false)
|
||||
@ -201,7 +210,7 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
|
||||
verifyMetricCount(storageOpMetrics, updatedStorageMetrics, "volume_provision", true)
|
||||
})
|
||||
|
||||
ginkgo.It("should create volume metrics with the correct PVC ref", func() {
|
||||
ginkgo.It("should create volume metrics with the correct FilesystemMode PVC ref", func() {
|
||||
var err error
|
||||
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
@ -258,6 +267,71 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
|
||||
framework.ExpectNoError(e2epod.DeletePodWithWait(c, pod))
|
||||
})
|
||||
|
||||
ginkgo.It("should create volume metrics with the correct BlockMode PVC ref", func() {
|
||||
var err error
|
||||
pvcBlock, err = c.CoreV1().PersistentVolumeClaims(pvcBlock.Namespace).Create(context.TODO(), pvcBlock, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNotEqual(pvcBlock, nil)
|
||||
|
||||
pod := e2epod.MakePod(ns, nil, nil, false, "")
|
||||
pod.Spec.Containers[0].VolumeDevices = []v1.VolumeDevice{{
|
||||
Name: pvcBlock.Name,
|
||||
DevicePath: "/mnt/" + pvcBlock.Name,
|
||||
}}
|
||||
pod.Spec.Volumes = []v1.Volume{{
|
||||
Name: pvcBlock.Name,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvcBlock.Name,
|
||||
ReadOnly: false,
|
||||
},
|
||||
},
|
||||
}}
|
||||
pod, err = c.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = e2epod.WaitTimeoutForPodRunningInNamespace(c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
|
||||
framework.ExpectNoError(err, "Error starting pod ", pod.Name)
|
||||
|
||||
pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Verify volume stat metrics were collected for the referenced PVC
|
||||
volumeStatKeys := []string{
|
||||
// BlockMode PVCs only support capacity (for now)
|
||||
kubeletmetrics.VolumeStatsCapacityBytesKey,
|
||||
}
|
||||
key := volumeStatKeys[0]
|
||||
kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
|
||||
// Poll kubelet metrics waiting for the volume to be picked up
|
||||
// by the volume stats collector
|
||||
var kubeMetrics e2emetrics.KubeletMetrics
|
||||
waitErr := wait.Poll(30*time.Second, 5*time.Minute, func() (bool, error) {
|
||||
framework.Logf("Grabbing Kubelet metrics")
|
||||
// Grab kubelet metrics from the node the pod was scheduled on
|
||||
var err error
|
||||
kubeMetrics, err = metricsGrabber.GrabFromKubelet(pod.Spec.NodeName)
|
||||
if err != nil {
|
||||
framework.Logf("Error fetching kubelet metrics")
|
||||
return false, err
|
||||
}
|
||||
if !findVolumeStatMetric(kubeletKeyName, pvcBlock.Namespace, pvcBlock.Name, kubeMetrics) {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(waitErr, "Unable to find metric %s for PVC %s/%s", kubeletKeyName, pvcBlock.Namespace, pvcBlock.Name)
|
||||
|
||||
for _, key := range volumeStatKeys {
|
||||
kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
|
||||
found := findVolumeStatMetric(kubeletKeyName, pvcBlock.Namespace, pvcBlock.Name, kubeMetrics)
|
||||
framework.ExpectEqual(found, true, "PVC %s, Namespace %s not found for %s", pvcBlock.Name, pvcBlock.Namespace, kubeletKeyName)
|
||||
}
|
||||
|
||||
framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
|
||||
framework.ExpectNoError(e2epod.DeletePodWithWait(c, pod))
|
||||
})
|
||||
|
||||
ginkgo.It("should create metrics for total time taken in volume operations in P/V Controller", func() {
|
||||
var err error
|
||||
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
|
||||
|
Loading…
Reference in New Issue
Block a user