From e99ced311494781fd5cdffbff40830099e5f3ae6 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Tue, 12 Jan 2021 16:53:30 +0100 Subject: [PATCH 1/4] Add support for gethering metrics from CSI block-mode volumes --- pkg/volume/csi/csi_block.go | 1 + pkg/volume/csi/csi_plugin.go | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/pkg/volume/csi/csi_block.go b/pkg/volume/csi/csi_block.go index cfda3e69f8a..687f2bc4385 100644 --- a/pkg/volume/csi/csi_block.go +++ b/pkg/volume/csi/csi_block.go @@ -95,6 +95,7 @@ type csiBlockMapper struct { spec *volume.Spec pod *v1.Pod podUID types.UID + volume.MetricsProvider } var _ volume.BlockVolumeMapper = &csiBlockMapper{} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 7609d0a3c47..3c6a68d2f5d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -698,6 +698,13 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt } klog.V(4).Info(log("created path successfully [%s]", dataDir)) + blockPath, err := mapper.GetGlobalMapPath(spec) + if err != nil { + return nil, errors.New(log("failed to get device path: %v", err)) + } + + mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath, csiDriverName(pvSource.Driver)) + // persist volume info data for teardown node := string(p.host.GetNodeName()) attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node) From e22012950b406ada696f61e1cc9e031f5f3c9ae4 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 20 Jan 2021 09:17:08 +0100 Subject: [PATCH 2/4] Add Kubelet.ListBlockVolumesForPod() --- pkg/kubelet/kubelet_volumes.go | 20 ++++++++++++++++ pkg/kubelet/server/server_test.go | 4 +++- pkg/kubelet/server/stats/handler.go | 3 +++ .../stats/testing/mock_stats_provider.go | 23 +++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/kubelet_volumes.go b/pkg/kubelet/kubelet_volumes.go index e2b11841f61..96e81ce00a5 100644 --- a/pkg/kubelet/kubelet_volumes.go +++ b/pkg/kubelet/kubelet_volumes.go @@ -50,6 +50,26 @@ func (kl *Kubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume return volumesToReturn, len(volumesToReturn) > 0 } +// ListBlockVolumesForPod returns a map of the mounted volumes for the given +// pod. The key in the map is the OuterVolumeSpecName (i.e. +// pod.Spec.Volumes[x].Name) +func (kl *Kubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) { + volumesToReturn := make(map[string]volume.BlockVolume) + podVolumes := kl.volumeManager.GetMountedVolumesForPod( + volumetypes.UniquePodName(podUID)) + for outerVolumeSpecName, volume := range podVolumes { + // TODO: volume.Mounter could be nil if volume object is recovered + // from reconciler's sync state process. PR 33616 will fix this problem + // to create Mounter object when recovering volume state. + if volume.BlockVolumeMapper == nil { + continue + } + volumesToReturn[outerVolumeSpecName] = volume.BlockVolumeMapper + } + + return volumesToReturn, len(volumesToReturn) > 0 +} + // podVolumesExist checks with the volume manager and returns true any of the // pods for the specified volume are mounted. func (kl *Kubelet) podVolumesExist(podUID types.UID) bool { diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 99974c1d7d3..8a53f39584b 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -259,7 +259,9 @@ func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return n func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) { return map[string]volume.Volume{}, true } - +func (*fakeKubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) { + return map[string]volume.BlockVolume{}, true +} func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { diff --git a/pkg/kubelet/server/stats/handler.go b/pkg/kubelet/server/stats/handler.go index c3781cf9b21..77e4fe7268c 100644 --- a/pkg/kubelet/server/stats/handler.go +++ b/pkg/kubelet/server/stats/handler.go @@ -88,6 +88,9 @@ type Provider interface { // ListVolumesForPod returns the stats of the volume used by the pod with // the podUID. ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) + // ListBlockVolumesForPod returns the stats of the volume used by the + // pod with the podUID. + ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) // GetPods returns the specs of all the pods running on this node. GetPods() []*v1.Pod diff --git a/pkg/kubelet/server/stats/testing/mock_stats_provider.go b/pkg/kubelet/server/stats/testing/mock_stats_provider.go index 73011bd9eb4..a5649928218 100644 --- a/pkg/kubelet/server/stats/testing/mock_stats_provider.go +++ b/pkg/kubelet/server/stats/testing/mock_stats_provider.go @@ -351,6 +351,29 @@ func (_m *StatsProvider) ListVolumesForPod(podUID types.UID) (map[string]volume. return r0, r1 } +// ListBlockVolumesForPod provides a mock function with given fields: podUID +func (_m *StatsProvider) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) { + ret := _m.Called(podUID) + + var r0 map[string]volume.BlockVolume + if rf, ok := ret.Get(0).(func(types.UID) map[string]volume.BlockVolume); ok { + r0 = rf(podUID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]volume.BlockVolume) + } + } + + var r1 bool + if rf, ok := ret.Get(1).(func(types.UID) bool); ok { + r1 = rf(podUID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + // RootFsStats provides a mock function with given fields: func (_m *StatsProvider) RootFsStats() (*v1alpha1.FsStats, error) { ret := _m.Called() From de3a4429d948b16fa2c5689db16f85f8cc84fdf0 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 20 Jan 2021 09:18:25 +0100 Subject: [PATCH 3/4] Make type BlockVolume a MetricsProvider --- pkg/volume/csi/csi_plugin.go | 2 +- pkg/volume/volume.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 3c6a68d2f5d..207b6e587dc 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -703,7 +703,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt return nil, errors.New(log("failed to get device path: %v", err)) } - mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath, csiDriverName(pvSource.Driver)) + mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath+"/"+string(podRef.UID), csiDriverName(pvSource.Driver)) // persist volume info data for teardown node := string(p.host.GetNodeName()) diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 2ec1e5233ba..642f72645f8 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -48,6 +48,10 @@ type BlockVolume interface { // and name of a symbolic link associated to a block device. // ex. pods/{podUid}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/, {volumeName} GetPodDeviceMapPath() (string, string) + + // MetricsProvider embeds methods for exposing metrics (e.g. + // used, available space). + MetricsProvider } // MetricsProvider exposes metrics (e.g. used,available space) related to a From fb703b4cc1a9a7f3c56660a2b665a429dcc11fa4 Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Wed, 20 Jan 2021 09:19:36 +0100 Subject: [PATCH 4/4] Include metrics of BlockVolumes in volumeStatCalculator --- pkg/kubelet/kubelet_volumes_test.go | 4 + .../server/stats/volume_stat_calculator.go | 21 +++++- .../stats/volume_stat_calculator_test.go | 74 +++++++++++++++++-- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/pkg/kubelet/kubelet_volumes_test.go b/pkg/kubelet/kubelet_volumes_test.go index bfc5c8b7c22..9c8f61723f0 100644 --- a/pkg/kubelet/kubelet_volumes_test.go +++ b/pkg/kubelet/kubelet_volumes_test.go @@ -566,3 +566,7 @@ func (f *stubBlockVolume) TearDownDevice(mapPath string, devicePath string) erro func (f *stubBlockVolume) UnmapPodDevice() error { return nil } + +func (f *stubBlockVolume) GetMetrics() (*volume.Metrics, error) { + return nil, nil +} diff --git a/pkg/kubelet/server/stats/volume_stat_calculator.go b/pkg/kubelet/server/stats/volume_stat_calculator.go index b12349a1e5a..39396d2f5d9 100644 --- a/pkg/kubelet/server/stats/volume_stat_calculator.go +++ b/pkg/kubelet/server/stats/volume_stat_calculator.go @@ -96,10 +96,27 @@ func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) { func (s *volumeStatCalculator) calcAndStoreStats() { // Find all Volumes for the Pod volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID) - if !found { + blockVolumes, bvFound := s.statsProvider.ListBlockVolumesForPod(s.pod.UID) + if !found && !bvFound { return } + metricVolumes := make(map[string]volume.MetricsProvider) + + if found { + for name, v := range volumes { + metricVolumes[name] = v + } + } + if bvFound { + for name, v := range blockVolumes { + // Only add the blockVolume if it implements the MetricsProvider interface + if _, ok := v.(volume.MetricsProvider); ok { + metricVolumes[name] = v + } + } + } + // Get volume specs for the pod - key'd by volume name volumesSpec := make(map[string]v1.Volume) for _, v := range s.pod.Spec.Volumes { @@ -109,7 +126,7 @@ func (s *volumeStatCalculator) calcAndStoreStats() { // Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats var ephemeralStats []stats.VolumeStats var persistentStats []stats.VolumeStats - for name, v := range volumes { + for name, v := range metricVolumes { metric, err := v.GetMetrics() if err != nil { // Expected for Volumes that don't support Metrics diff --git a/pkg/kubelet/server/stats/volume_stat_calculator_test.go b/pkg/kubelet/server/stats/volume_stat_calculator_test.go index 687586b7f91..8be5548e666 100644 --- a/pkg/kubelet/server/stats/volume_stat_calculator_test.go +++ b/pkg/kubelet/server/stats/volume_stat_calculator_test.go @@ -45,9 +45,11 @@ const ( inodesTotal = int64(2000) inodesFree = int64(1000) - vol0 = "vol0" - vol1 = "vol1" - pvcClaimName = "pvc-fake" + vol0 = "vol0" + vol1 = "vol1" + vol2 = "vol2" + pvcClaimName0 = "pvc-fake0" + pvcClaimName1 = "pvc-fake1" ) var ( @@ -66,7 +68,15 @@ var ( Name: vol1, VolumeSource: k8sv1.VolumeSource{ PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcClaimName, + ClaimName: pvcClaimName0, + }, + }, + }, + { + Name: vol2, + VolumeSource: k8sv1.VolumeSource{ + PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcClaimName1, }, }, }, @@ -91,6 +101,8 @@ func TestPVCRef(t *testing.T) { mockStats := new(statstest.StatsProvider) volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}} mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + blockVolumes := map[string]volume.BlockVolume{vol2: &fakeBlockVolume{}} + mockStats.On("ListBlockVolumesForPod", fakePod.UID).Return(blockVolumes, true) eventStore := make(chan string, 1) fakeEventRecorder := record.FakeRecorder{ @@ -102,7 +114,7 @@ func TestPVCRef(t *testing.T) { statsCalculator.calcAndStoreStats() vs, _ := statsCalculator.GetLatest() - assert.Len(t, append(vs.EphemeralVolumes, vs.PersistentVolumes...), 2) + assert.Len(t, append(vs.EphemeralVolumes, vs.PersistentVolumes...), 3) // Verify 'vol0' doesn't have a PVC reference assert.Contains(t, append(vs.EphemeralVolumes, vs.PersistentVolumes...), kubestats.VolumeStats{ Name: vol0, @@ -112,17 +124,28 @@ func TestPVCRef(t *testing.T) { assert.Contains(t, append(vs.EphemeralVolumes, vs.PersistentVolumes...), kubestats.VolumeStats{ Name: vol1, PVCRef: &kubestats.PVCReference{ - Name: pvcClaimName, + Name: pvcClaimName0, Namespace: namespace0, }, FsStats: expectedFSStats(), }) + // Verify 'vol2' has a PVC reference + assert.Contains(t, append(vs.EphemeralVolumes, vs.PersistentVolumes...), kubestats.VolumeStats{ + Name: vol2, + PVCRef: &kubestats.PVCReference{ + Name: pvcClaimName1, + Namespace: namespace0, + }, + FsStats: expectedBlockStats(), + }) } func TestNormalVolumeEvent(t *testing.T) { mockStats := new(statstest.StatsProvider) volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}} mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + blockVolumes := map[string]volume.BlockVolume{vol2: &fakeBlockVolume{}} + mockStats.On("ListBlockVolumesForPod", fakePod.UID).Return(blockVolumes, true) eventStore := make(chan string, 2) fakeEventRecorder := record.FakeRecorder{ @@ -144,6 +167,8 @@ func TestAbnormalVolumeEvent(t *testing.T) { mockStats := new(statstest.StatsProvider) volumes := map[string]volume.Volume{vol0: &fakeVolume{}} mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true) + blockVolumes := map[string]volume.BlockVolume{vol1: &fakeBlockVolume{}} + mockStats.On("ListBlockVolumesForPod", fakePod.UID).Return(blockVolumes, true) eventStore := make(chan string, 2) fakeEventRecorder := record.FakeRecorder{ @@ -211,3 +236,40 @@ func expectedFSStats() kubestats.FsStats { InodesUsed: &inodesUsed, } } + +// Fake block-volume/metrics provider, block-devices have no inodes +var _ volume.BlockVolume = &fakeBlockVolume{} + +type fakeBlockVolume struct{} + +func (v *fakeBlockVolume) GetGlobalMapPath(*volume.Spec) (string, error) { return "", nil } + +func (v *fakeBlockVolume) GetPodDeviceMapPath() (string, string) { return "", "" } + +func (v *fakeBlockVolume) GetMetrics() (*volume.Metrics, error) { + return expectedBlockMetrics(), nil +} + +func expectedBlockMetrics() *volume.Metrics { + return &volume.Metrics{ + Available: resource.NewQuantity(available, resource.BinarySI), + Capacity: resource.NewQuantity(capacity, resource.BinarySI), + Used: resource.NewQuantity(available-capacity, resource.BinarySI), + } +} + +func expectedBlockStats() kubestats.FsStats { + metric := expectedBlockMetrics() + available := uint64(metric.Available.Value()) + capacity := uint64(metric.Capacity.Value()) + used := uint64(metric.Used.Value()) + null := uint64(0) + return kubestats.FsStats{ + AvailableBytes: &available, + CapacityBytes: &capacity, + UsedBytes: &used, + Inodes: &null, + InodesFree: &null, + InodesUsed: &null, + } +}