mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #99284 from fengzixu/support-external-health-monitor
feature: add CSIVolumeHealth feature and gate
This commit is contained in:
commit
dcb3c56957
@ -741,6 +741,12 @@ const (
|
||||
//
|
||||
// Labels all namespaces with a default label "kubernetes.io/metadata.name: <namespaceName>"
|
||||
NamespaceDefaultLabelName featuregate.Feature = "NamespaceDefaultLabelName"
|
||||
|
||||
// owner: @fengzixu
|
||||
// alpha: v1.21
|
||||
//
|
||||
// Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it.
|
||||
CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -853,6 +859,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
SuspendJob: {Default: false, PreRelease: featuregate.Alpha},
|
||||
KubeletPodResourcesGetAllocatable: {Default: false, PreRelease: featuregate.Alpha},
|
||||
NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.Beta}, // graduate to GA and lock to default in 1.22, remove in 1.24
|
||||
CSIVolumeHealth: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
@ -601,7 +601,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
|
||||
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
|
||||
|
||||
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
|
||||
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
|
||||
|
||||
klet.dockerLegacyService = kubeDeps.dockerLegacyService
|
||||
klet.runtimeService = kubeDeps.RemoteRuntimeService
|
||||
|
@ -255,7 +255,7 @@ func newTestKubeletWithImageList(
|
||||
}
|
||||
|
||||
volumeStatsAggPeriod := time.Second * 10
|
||||
kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod)
|
||||
kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.recorder)
|
||||
|
||||
fakeHostStatsProvider := stats.NewFakeHostStatsProvider()
|
||||
|
||||
|
@ -115,7 +115,7 @@ func TestRunOnce(t *testing.T) {
|
||||
|
||||
// TODO: Factor out "stats.Provider" from Kubelet so we don't have a cyclic dependency
|
||||
volumeStatsAggPeriod := time.Second * 10
|
||||
kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod)
|
||||
kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.recorder)
|
||||
nodeRef := &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: string(kb.nodeName),
|
||||
|
@ -45,6 +45,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
@ -343,7 +344,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
|
||||
}
|
||||
server := NewServer(
|
||||
fw.fakeKubelet,
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
|
||||
fw.fakeAuth,
|
||||
kubeCfg)
|
||||
fw.serverUnderTest = &server
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -40,15 +41,17 @@ type fsResourceAnalyzer struct {
|
||||
calcPeriod time.Duration
|
||||
cachedVolumeStats atomic.Value
|
||||
startOnce sync.Once
|
||||
eventRecorder record.EventRecorder
|
||||
}
|
||||
|
||||
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
|
||||
|
||||
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
|
||||
func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
|
||||
func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration, eventRecorder record.EventRecorder) *fsResourceAnalyzer {
|
||||
r := &fsResourceAnalyzer{
|
||||
statsProvider: statsProvider,
|
||||
calcPeriod: calcVolumePeriod,
|
||||
eventRecorder: eventRecorder,
|
||||
}
|
||||
r.cachedVolumeStats.Store(make(statCache))
|
||||
return r
|
||||
@ -74,7 +77,7 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
|
||||
// Copy existing entries to new map, creating/starting new entries for pods missing from the cache
|
||||
for _, pod := range s.statsProvider.GetPods() {
|
||||
if value, found := oldCache[pod.GetUID()]; !found {
|
||||
newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce()
|
||||
newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod, s.eventRecorder).StartOnce()
|
||||
} else {
|
||||
newCache[pod.GetUID()] = value
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package stats
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -37,8 +38,8 @@ type resourceAnalyzer struct {
|
||||
var _ ResourceAnalyzer = &resourceAnalyzer{}
|
||||
|
||||
// NewResourceAnalyzer returns a new ResourceAnalyzer
|
||||
func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer {
|
||||
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency)
|
||||
func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration, eventRecorder record.EventRecorder) ResourceAnalyzer {
|
||||
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency, eventRecorder)
|
||||
summaryProvider := NewSummaryProvider(statsProvider)
|
||||
return &resourceAnalyzer{fsAnalyzer, summaryProvider}
|
||||
}
|
||||
|
@ -17,17 +17,20 @@ limitations under the License.
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
|
||||
@ -39,6 +42,7 @@ type volumeStatCalculator struct {
|
||||
startO sync.Once
|
||||
stopO sync.Once
|
||||
latest atomic.Value
|
||||
eventRecorder record.EventRecorder
|
||||
}
|
||||
|
||||
// PodVolumeStats encapsulates the VolumeStats for a pod.
|
||||
@ -49,12 +53,13 @@ type PodVolumeStats struct {
|
||||
}
|
||||
|
||||
// newVolumeStatCalculator creates a new VolumeStatCalculator
|
||||
func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator {
|
||||
func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator {
|
||||
return &volumeStatCalculator{
|
||||
statsProvider: statsProvider,
|
||||
jitterPeriod: jitterPeriod,
|
||||
pod: pod,
|
||||
stopChannel: make(chan struct{}),
|
||||
eventRecorder: eventRecorder,
|
||||
}
|
||||
}
|
||||
|
||||
@ -129,6 +134,11 @@ func (s *volumeStatCalculator) calcAndStoreStats() {
|
||||
persistentStats = append(persistentStats, volumeStats)
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
|
||||
if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) {
|
||||
s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Store the new stats
|
||||
|
@ -17,15 +17,22 @@ limitations under the License.
|
||||
package stats
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
k8sv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
kubestats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
statstest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
@ -43,9 +50,10 @@ const (
|
||||
pvcClaimName = "pvc-fake"
|
||||
)
|
||||
|
||||
func TestPVCRef(t *testing.T) {
|
||||
var (
|
||||
ErrorWatchTimeout = errors.New("watch event timeout")
|
||||
// Create pod spec to test against
|
||||
podVolumes := []k8sv1.Volume{
|
||||
podVolumes = []k8sv1.Volume{
|
||||
{
|
||||
Name: vol0,
|
||||
VolumeSource: k8sv1.VolumeSource{
|
||||
@ -64,7 +72,7 @@ func TestPVCRef(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fakePod := &k8sv1.Pod{
|
||||
fakePod = &k8sv1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: pName0,
|
||||
Namespace: namespace0,
|
||||
@ -75,13 +83,22 @@ func TestPVCRef(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
volumeCondition = &csipbv1.VolumeCondition{}
|
||||
)
|
||||
|
||||
func TestPVCRef(t *testing.T) {
|
||||
// Setup mock stats provider
|
||||
mockStats := new(statstest.StatsProvider)
|
||||
volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}}
|
||||
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
|
||||
|
||||
eventStore := make(chan string, 1)
|
||||
fakeEventRecorder := record.FakeRecorder{
|
||||
Events: eventStore,
|
||||
}
|
||||
|
||||
// Calculate stats for pod
|
||||
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod)
|
||||
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
|
||||
statsCalculator.calcAndStoreStats()
|
||||
vs, _ := statsCalculator.GetLatest()
|
||||
|
||||
@ -102,6 +119,57 @@ func TestPVCRef(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestNormalVolumeEvent(t *testing.T) {
|
||||
mockStats := new(statstest.StatsProvider)
|
||||
volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}}
|
||||
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
|
||||
|
||||
eventStore := make(chan string, 2)
|
||||
fakeEventRecorder := record.FakeRecorder{
|
||||
Events: eventStore,
|
||||
}
|
||||
|
||||
// Calculate stats for pod
|
||||
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
|
||||
statsCalculator.calcAndStoreStats()
|
||||
|
||||
event, err := WatchEvent(eventStore)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", event)
|
||||
}
|
||||
|
||||
func TestAbnormalVolumeEvent(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
|
||||
// Setup mock stats provider
|
||||
mockStats := new(statstest.StatsProvider)
|
||||
volumes := map[string]volume.Volume{vol0: &fakeVolume{}}
|
||||
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
|
||||
|
||||
eventStore := make(chan string, 2)
|
||||
fakeEventRecorder := record.FakeRecorder{
|
||||
Events: eventStore,
|
||||
}
|
||||
|
||||
// Calculate stats for pod
|
||||
volumeCondition.Message = "The target path of the volume doesn't exist"
|
||||
volumeCondition.Abnormal = true
|
||||
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
|
||||
statsCalculator.calcAndStoreStats()
|
||||
|
||||
event, err := WatchEvent(eventStore)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, fmt.Sprintf("Warning VolumeConditionAbnormal Volume %s: The target path of the volume doesn't exist", "vol0"), event)
|
||||
}
|
||||
|
||||
func WatchEvent(eventChan <-chan string) (string, error) {
|
||||
select {
|
||||
case event := <-eventChan:
|
||||
return event, nil
|
||||
case <-time.After(5 * time.Second):
|
||||
return "", ErrorWatchTimeout
|
||||
}
|
||||
}
|
||||
|
||||
// Fake volume/metrics provider
|
||||
var _ volume.Volume = &fakeVolume{}
|
||||
|
||||
@ -121,6 +189,8 @@ func expectedMetrics() *volume.Metrics {
|
||||
Inodes: resource.NewQuantity(inodesTotal, resource.BinarySI),
|
||||
InodesFree: resource.NewQuantity(inodesFree, resource.BinarySI),
|
||||
InodesUsed: resource.NewQuantity(inodesTotal-inodesFree, resource.BinarySI),
|
||||
Message: &volumeCondition.Message,
|
||||
Abnormal: &volumeCondition.Abnormal,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,9 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
@ -624,6 +626,19 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
|
||||
Inodes: resource.NewQuantity(int64(0), resource.BinarySI),
|
||||
InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
|
||||
isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, nodeClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isSupportNodeVolumeCondition {
|
||||
abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
|
||||
metrics.Abnormal, metrics.Message = &abnormal, &message
|
||||
}
|
||||
}
|
||||
|
||||
for _, usage := range usages {
|
||||
if usage == nil {
|
||||
continue
|
||||
@ -646,6 +661,30 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func supportNodeGetVolumeCondition(ctx context.Context, nodeClient csipbv1.NodeClient) (supportNodeGetVolumeCondition bool, err error) {
|
||||
req := csipbv1.NodeGetCapabilitiesRequest{}
|
||||
rsp, err := nodeClient.NodeGetCapabilities(ctx, &req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, cap := range rsp.GetCapabilities() {
|
||||
if cap == nil {
|
||||
continue
|
||||
}
|
||||
rpc := cap.GetRpc()
|
||||
if rpc == nil {
|
||||
continue
|
||||
}
|
||||
t := rpc.GetType()
|
||||
if t == csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func isFinalError(err error) bool {
|
||||
// Sources:
|
||||
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
|
||||
|
@ -26,9 +26,14 @@ import (
|
||||
"testing"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
api "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/csi/fake"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
@ -60,6 +65,13 @@ func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *f
|
||||
}
|
||||
}
|
||||
|
||||
func newFakeCsiDriverClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) *fakeCsiDriverClient {
|
||||
return &fakeCsiDriverClient{
|
||||
t: t,
|
||||
nodeClient: fake.NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
|
||||
nodeID string,
|
||||
maxVolumePerNode int64,
|
||||
@ -80,15 +92,30 @@ func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID stri
|
||||
VolumeId: volID,
|
||||
VolumePath: targetPath,
|
||||
}
|
||||
|
||||
c.nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
|
||||
resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
usages := resp.GetUsage()
|
||||
metrics := &volume.Metrics{}
|
||||
if usages == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metrics := &volume.Metrics{}
|
||||
|
||||
isSupportNodeVolumeCondition, err := supportNodeGetVolumeCondition(ctx, c.nodeClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) && isSupportNodeVolumeCondition {
|
||||
abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
|
||||
metrics.Abnormal, metrics.Message = &abnormal, &message
|
||||
}
|
||||
|
||||
for _, usage := range usages {
|
||||
if usage == nil {
|
||||
continue
|
||||
@ -325,6 +352,10 @@ func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet b
|
||||
return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
|
||||
}
|
||||
|
||||
func setupClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) csiClient {
|
||||
return newFakeCsiDriverClientWithVolumeStatsAndCondition(t, volumeStatsSet, volumeConditionSet)
|
||||
}
|
||||
|
||||
func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient {
|
||||
return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet)
|
||||
}
|
||||
@ -674,13 +705,108 @@ type VolumeStatsOptions struct {
|
||||
DeviceMountPath string
|
||||
}
|
||||
|
||||
func TestVolumeStats(t *testing.T) {
|
||||
func TestVolumeHealthEnable(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
|
||||
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
|
||||
tests := []struct {
|
||||
name string
|
||||
volumeStatsSet bool
|
||||
volumeConditionSet bool
|
||||
volumeData VolumeStatsOptions
|
||||
success bool
|
||||
}{
|
||||
{
|
||||
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=on",
|
||||
volumeStatsSet: true,
|
||||
volumeConditionSet: true,
|
||||
volumeData: VolumeStatsOptions{
|
||||
VolumeSpec: spec,
|
||||
VolumeID: "volume1",
|
||||
DeviceMountPath: "/foo/bar",
|
||||
},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
|
||||
volumeStatsSet: true,
|
||||
volumeConditionSet: false,
|
||||
volumeData: VolumeStatsOptions{
|
||||
VolumeSpec: spec,
|
||||
VolumeID: "volume1",
|
||||
DeviceMountPath: "/foo/bar",
|
||||
},
|
||||
success: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||
defer cancel()
|
||||
csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
|
||||
csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, tc.volumeConditionSet)
|
||||
metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
|
||||
if tc.success {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
if tc.volumeConditionSet {
|
||||
assert.NotNil(t, metrics.Abnormal)
|
||||
assert.NotNil(t, metrics.Message)
|
||||
} else {
|
||||
assert.Nil(t, metrics.Abnormal)
|
||||
assert.Nil(t, metrics.Message)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeHealthDisable(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, false)()
|
||||
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
|
||||
tests := []struct {
|
||||
name string
|
||||
volumeStatsSet bool
|
||||
volumeData VolumeStatsOptions
|
||||
success bool
|
||||
}{
|
||||
{
|
||||
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
|
||||
volumeStatsSet: true,
|
||||
volumeData: VolumeStatsOptions{
|
||||
VolumeSpec: spec,
|
||||
VolumeID: "volume1",
|
||||
DeviceMountPath: "/foo/bar",
|
||||
},
|
||||
success: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||
defer cancel()
|
||||
csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
|
||||
csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, false)
|
||||
metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
|
||||
if tc.success {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
assert.Nil(t, metrics.Abnormal)
|
||||
assert.Nil(t, metrics.Message)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestVolumeStats(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
|
||||
spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
|
||||
tests := []struct {
|
||||
name string
|
||||
volumeStatsSet bool
|
||||
volumeConditionSet bool
|
||||
volumeData VolumeStatsOptions
|
||||
success bool
|
||||
}{
|
||||
{
|
||||
name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on",
|
||||
|
@ -84,6 +84,7 @@ type NodeClient struct {
|
||||
stageUnstageSet bool
|
||||
expansionSet bool
|
||||
volumeStatsSet bool
|
||||
volumeConditionSet bool
|
||||
nodeGetInfoResp *csipb.NodeGetInfoResponse
|
||||
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
|
||||
FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
|
||||
@ -115,6 +116,13 @@ func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient {
|
||||
}
|
||||
}
|
||||
|
||||
func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet bool) *NodeClient {
|
||||
return &NodeClient{
|
||||
volumeStatsSet: volumeStatsSet,
|
||||
volumeConditionSet: volumeConditionSet,
|
||||
}
|
||||
}
|
||||
|
||||
// SetNextError injects next expected error
|
||||
func (f *NodeClient) SetNextError(err error) {
|
||||
f.nextErr = err
|
||||
@ -346,6 +354,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
if f.volumeConditionSet {
|
||||
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
|
||||
Type: &csipb.NodeServiceCapability_Rpc{
|
||||
Rpc: &csipb.NodeServiceCapability_RPC{
|
||||
Type: csipb.NodeServiceCapability_RPC_VOLUME_CONDITION,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -92,6 +92,17 @@ type Metrics struct {
|
||||
// a filesystem with the host (e.g. emptydir, hostpath), this is the free inodes
|
||||
// on the underlying storage, and is shared with host processes and other volumes
|
||||
InodesFree *resource.Quantity
|
||||
|
||||
// Normal volumes are available for use and operating optimally.
|
||||
// An abnormal volume does not meet these criteria.
|
||||
// This field is OPTIONAL. Only some csi drivers which support NodeServiceCapability_RPC_VOLUME_CONDITION
|
||||
// need to fill it.
|
||||
Abnormal *bool
|
||||
|
||||
// The message describing the condition of the volume.
|
||||
// This field is OPTIONAL. Only some csi drivers which support capability_RPC_VOLUME_CONDITION
|
||||
// need to fill it.
|
||||
Message *string
|
||||
}
|
||||
|
||||
// Attributes represents the attributes of this mounter.
|
||||
|
Loading…
Reference in New Issue
Block a user