feature: add CSIVolumeHealth feature and gate

1. add EventRecorder to ResourceAnalyzer
2. add CSIVolumeHealth feature and gate
This commit is contained in:
fengzixu
2021-03-10 01:16:37 +09:00
parent 8a8c267e58
commit edc1c62471
13 changed files with 304 additions and 18 deletions

View File

@@ -45,6 +45,7 @@ import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/remotecommand"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
@@ -343,7 +344,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
}
server := NewServer(
fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
fw.fakeAuth,
kubeCfg)
fw.serverUnderTest = &server

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
)
@@ -40,15 +41,17 @@ type fsResourceAnalyzer struct {
calcPeriod time.Duration
cachedVolumeStats atomic.Value
startOnce sync.Once
eventRecorder record.EventRecorder
}
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration, eventRecorder record.EventRecorder) *fsResourceAnalyzer {
r := &fsResourceAnalyzer{
statsProvider: statsProvider,
calcPeriod: calcVolumePeriod,
eventRecorder: eventRecorder,
}
r.cachedVolumeStats.Store(make(statCache))
return r
@@ -74,7 +77,7 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
// Copy existing entries to new map, creating/starting new entries for pods missing from the cache
for _, pod := range s.statsProvider.GetPods() {
if value, found := oldCache[pod.GetUID()]; !found {
newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce()
newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod, s.eventRecorder).StartOnce()
} else {
newCache[pod.GetUID()] = value
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package stats
import (
"k8s.io/client-go/tools/record"
"time"
)
@@ -37,8 +38,8 @@ type resourceAnalyzer struct {
var _ ResourceAnalyzer = &resourceAnalyzer{}
// NewResourceAnalyzer returns a new ResourceAnalyzer
func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer {
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency)
func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration, eventRecorder record.EventRecorder) ResourceAnalyzer {
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency, eventRecorder)
summaryProvider := NewSummaryProvider(statsProvider)
return &resourceAnalyzer{fsAnalyzer, summaryProvider}
}

View File

@@ -17,17 +17,20 @@ limitations under the License.
package stats
import (
"fmt"
"sync"
"sync/atomic"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/klog/v2"
)
// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
@@ -39,6 +42,7 @@ type volumeStatCalculator struct {
startO sync.Once
stopO sync.Once
latest atomic.Value
eventRecorder record.EventRecorder
}
// PodVolumeStats encapsulates the VolumeStats for a pod.
@@ -49,12 +53,13 @@ type PodVolumeStats struct {
}
// newVolumeStatCalculator creates a new VolumeStatCalculator
func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator {
func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator {
return &volumeStatCalculator{
statsProvider: statsProvider,
jitterPeriod: jitterPeriod,
pod: pod,
stopChannel: make(chan struct{}),
eventRecorder: eventRecorder,
}
}
@@ -129,6 +134,11 @@ func (s *volumeStatCalculator) calcAndStoreStats() {
persistentStats = append(persistentStats, volumeStats)
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) {
s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message))
}
}
}
// Store the new stats

View File

@@ -17,15 +17,22 @@ limitations under the License.
package stats
import (
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubestats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/features"
statstest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing"
"k8s.io/kubernetes/pkg/volume"
)
@@ -43,9 +50,10 @@ const (
pvcClaimName = "pvc-fake"
)
func TestPVCRef(t *testing.T) {
var (
ErrorWatchTimeout = errors.New("watch event timeout")
// Create pod spec to test against
podVolumes := []k8sv1.Volume{
podVolumes = []k8sv1.Volume{
{
Name: vol0,
VolumeSource: k8sv1.VolumeSource{
@@ -64,7 +72,7 @@ func TestPVCRef(t *testing.T) {
},
}
fakePod := &k8sv1.Pod{
fakePod = &k8sv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pName0,
Namespace: namespace0,
@@ -75,13 +83,22 @@ func TestPVCRef(t *testing.T) {
},
}
volumeCondition = &csipbv1.VolumeCondition{}
)
func TestPVCRef(t *testing.T) {
// Setup mock stats provider
mockStats := new(statstest.StatsProvider)
volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}}
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
eventStore := make(chan string, 1)
fakeEventRecorder := record.FakeRecorder{
Events: eventStore,
}
// Calculate stats for pod
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod)
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
statsCalculator.calcAndStoreStats()
vs, _ := statsCalculator.GetLatest()
@@ -102,6 +119,57 @@ func TestPVCRef(t *testing.T) {
})
}
func TestNormalVolumeEvent(t *testing.T) {
mockStats := new(statstest.StatsProvider)
volumes := map[string]volume.Volume{vol0: &fakeVolume{}, vol1: &fakeVolume{}}
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
eventStore := make(chan string, 2)
fakeEventRecorder := record.FakeRecorder{
Events: eventStore,
}
// Calculate stats for pod
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
statsCalculator.calcAndStoreStats()
event, err := WatchEvent(eventStore)
assert.NotNil(t, err)
assert.Equal(t, "", event)
}
func TestAbnormalVolumeEvent(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
// Setup mock stats provider
mockStats := new(statstest.StatsProvider)
volumes := map[string]volume.Volume{vol0: &fakeVolume{}}
mockStats.On("ListVolumesForPod", fakePod.UID).Return(volumes, true)
eventStore := make(chan string, 2)
fakeEventRecorder := record.FakeRecorder{
Events: eventStore,
}
// Calculate stats for pod
volumeCondition.Message = "The target path of the volume doesn't exist"
volumeCondition.Abnormal = true
statsCalculator := newVolumeStatCalculator(mockStats, time.Minute, fakePod, &fakeEventRecorder)
statsCalculator.calcAndStoreStats()
event, err := WatchEvent(eventStore)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("Warning VolumeConditionAbnormal Volume %s: The target path of the volume doesn't exist", "vol0"), event)
}
func WatchEvent(eventChan <-chan string) (string, error) {
select {
case event := <-eventChan:
return event, nil
case <-time.After(5 * time.Second):
return "", ErrorWatchTimeout
}
}
// Fake volume/metrics provider
var _ volume.Volume = &fakeVolume{}
@@ -121,6 +189,8 @@ func expectedMetrics() *volume.Metrics {
Inodes: resource.NewQuantity(inodesTotal, resource.BinarySI),
InodesFree: resource.NewQuantity(inodesFree, resource.BinarySI),
InodesUsed: resource.NewQuantity(inodesTotal-inodesFree, resource.BinarySI),
Message: &volumeCondition.Message,
Abnormal: &volumeCondition.Abnormal,
}
}