diff --git a/pkg/controller/volume/ephemeral/controller.go b/pkg/controller/volume/ephemeral/controller.go index 59d008a0fff..0f474e9470f 100644 --- a/pkg/controller/volume/ephemeral/controller.go +++ b/pkg/controller/volume/ephemeral/controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller/volume/common" + ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/volume/util" ) @@ -90,6 +91,8 @@ func NewController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"), } + ephemeralvolumemetrics.RegisterMetrics() + eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) @@ -278,8 +281,10 @@ func (ec *ephemeralController) handleVolume(pod *v1.Pod, vol v1.Volume) error { }, Spec: ephemeral.VolumeClaimTemplate.Spec, } + ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc() _, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) if err != nil { + ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc() return fmt.Errorf("create PVC %s: %v", pvcName, err) } return nil diff --git a/pkg/controller/volume/ephemeral/controller_test.go b/pkg/controller/volume/ephemeral/controller_test.go index 370c50d2187..7b1c29ea5a7 100644 --- a/pkg/controller/volume/ephemeral/controller_test.go +++ b/pkg/controller/volume/ephemeral/controller_test.go @@ -18,21 +18,26 @@ package ephemeral import ( "context" + "errors" "sort" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" // storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" // "k8s.io/apimachinery/pkg/types" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache" + "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" + ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics" "github.com/stretchr/testify/assert" ) @@ -57,18 +62,20 @@ func init() { func TestSyncHandler(t *testing.T) { tests := []struct { - name string - podKey string - pvcs []*v1.PersistentVolumeClaim - pods []*v1.Pod - expectedPVCs []v1.PersistentVolumeClaim - expectedError bool + name string + podKey string + pvcs []*v1.PersistentVolumeClaim + pods []*v1.Pod + expectedPVCs []v1.PersistentVolumeClaim + expectedError bool + expectedMetrics expectedMetrics }{ { - name: "create", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), - expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, + name: "create", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + expectedPVCs: []v1.PersistentVolumeClaim{*testPodEphemeralClaim}, + expectedMetrics: expectedMetrics{1, 0}, }, { name: "no-such-pod", @@ -90,11 +97,12 @@ func TestSyncHandler(t *testing.T) { podKey: podKey(testPod), }, { - name: "create-with-other-PVC", - pods: []*v1.Pod{testPodWithEphemeral}, - podKey: podKey(testPodWithEphemeral), - pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, - expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, + name: "create-with-other-PVC", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + pvcs: []*v1.PersistentVolumeClaim{otherNamespaceClaim}, + expectedPVCs: []v1.PersistentVolumeClaim{*otherNamespaceClaim, *testPodEphemeralClaim}, + expectedMetrics: expectedMetrics{1, 0}, }, { name: "wrong-PVC-owner", @@ -104,10 +112,17 @@ func TestSyncHandler(t *testing.T) { expectedPVCs: []v1.PersistentVolumeClaim{*conflictingClaim}, expectedError: true, }, + { + name: "create-conflict", + pods: []*v1.Pod{testPodWithEphemeral}, + podKey: podKey(testPodWithEphemeral), + expectedMetrics: expectedMetrics{1, 1}, + expectedError: true, + }, } for _, tc := range tests { - // Run sequentially because of global logging. + // Run sequentially because of global logging and global metrics. t.Run(tc.name, func(t *testing.T) { // There is no good way to shut down the informers. They spawn // various goroutines and some of them (in particular shared informer) @@ -124,6 +139,12 @@ func TestSyncHandler(t *testing.T) { } fakeKubeClient := createTestClient(objects...) + if tc.expectedMetrics.numFailures > 0 { + fakeKubeClient.PrependReactor("create", "persistentvolumeclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, apierrors.NewConflict(action.GetResource().GroupResource(), "fake name", errors.New("fake conflict")) + }) + } + setupMetrics() informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() @@ -152,6 +173,7 @@ func TestSyncHandler(t *testing.T) { t.Fatalf("unexpected error while listing PVCs: %v", err) } assert.Equal(t, sortPVCs(tc.expectedPVCs), sortPVCs(pvcs.Items)) + expectMetrics(t, tc.expectedMetrics) }) } } @@ -219,3 +241,37 @@ func createTestClient(objects ...runtime.Object) *fake.Clientset { fakeClient := fake.NewSimpleClientset(objects...) return fakeClient } + +// Metrics helpers + +type expectedMetrics struct { + numCreated int + numFailures int +} + +func expectMetrics(t *testing.T, em expectedMetrics) { + t.Helper() + + actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateAttempts) + handleErr(t, err, "ephemeralVolumeCreate") + if actualCreated != float64(em.numCreated) { + t.Errorf("Expected PVCs to be created %d, got %v", em.numCreated, actualCreated) + } + actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.EphemeralVolumeCreateFailures) + handleErr(t, err, "ephemeralVolumeCreate/Conflict") + if actualConflicts != float64(em.numFailures) { + t.Errorf("Expected PVCs to have conflicts %d, got %v", em.numFailures, actualConflicts) + } +} + +func handleErr(t *testing.T, err error, metricName string) { + if err != nil { + t.Errorf("Failed to get %s value, err: %v", metricName, err) + } +} + +func setupMetrics() { + ephemeralvolumemetrics.RegisterMetrics() + ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Reset() + ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Reset() +} diff --git a/pkg/controller/volume/ephemeral/metrics/metrics.go b/pkg/controller/volume/ephemeral/metrics/metrics.go new file mode 100644 index 00000000000..20eb9f1f6e4 --- /dev/null +++ b/pkg/controller/volume/ephemeral/metrics/metrics.go @@ -0,0 +1,58 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// EphemeralVolumeSubsystem - subsystem name used for Endpoint Slices. +const EphemeralVolumeSubsystem = "ephemeral_volume_controller" + +var ( + // EphemeralVolumeCreateAttempts tracks the number of + // PersistentVolumeClaims().Create calls (both successful and unsuccessful) + EphemeralVolumeCreateAttempts = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: EphemeralVolumeSubsystem, + Name: "create_total", + Help: "Number of PersistenVolumeClaims creation requests", + StabilityLevel: metrics.ALPHA, + }) + // EphemeralVolumeCreateFailures tracks the number of unsuccessful + // PersistentVolumeClaims().Create calls + EphemeralVolumeCreateFailures = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: EphemeralVolumeSubsystem, + Name: "create_failures_total", + Help: "Number of PersistenVolumeClaims creation requests", + StabilityLevel: metrics.ALPHA, + }) +) + +var registerMetrics sync.Once + +// RegisterMetrics registers EphemeralVolume metrics. +func RegisterMetrics() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(EphemeralVolumeCreateAttempts) + legacyregistry.MustRegister(EphemeralVolumeCreateFailures) + }) +} diff --git a/staging/src/k8s.io/component-base/metrics/counter.go b/staging/src/k8s.io/component-base/metrics/counter.go index 7344945e177..34c177fe1ac 100644 --- a/staging/src/k8s.io/component-base/metrics/counter.go +++ b/staging/src/k8s.io/component-base/metrics/counter.go @@ -20,6 +20,7 @@ import ( "context" "github.com/blang/semver" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) // Counter is our internal representation for our wrapping struct around prometheus @@ -31,6 +32,9 @@ type Counter struct { selfCollector } +// The implementation of the Metric interface is expected by testutil.GetCounterMetricValue. +var _ Metric = &Counter{} + // NewCounter returns an object which satisfies the kubeCollector and CounterMetric interfaces. // However, the object returned will not measure anything unless the collector is first // registered, since the metric is lazily instantiated. @@ -46,6 +50,14 @@ func NewCounter(opts *CounterOpts) *Counter { return kc } +func (c *Counter) Desc() *prometheus.Desc { + return c.metric.Desc() +} + +func (c *Counter) Write(to *dto.Metric) error { + return c.metric.Write(to) +} + // Reset resets the underlying prometheus Counter to start counting from 0 again func (c *Counter) Reset() { if !c.IsCreated() {