From c2524cbf9b49f034053f758401ec3b08a4504e0e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 26 Sep 2024 14:43:12 +0200 Subject: [PATCH] DRA resourceclaims: maintain metric of total and allocated claims These metrics can provide insights into ResourceClaim usage. The total count is redundant because the apiserver also provides count of resources, but having it in the same sub-system next to the count of allocated claims might be more discoverable and helps monitor the controller itself. --- pkg/controller/resourceclaim/controller.go | 49 +++++- .../resourceclaim/controller_test.go | 159 +++++++++++++++--- .../resourceclaim/metrics/metrics.go | 18 ++ 3 files changed, 197 insertions(+), 29 deletions(-) diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 857b751bed0..e2f3aa3dbbc 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -157,15 +157,15 @@ func NewController( if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { logger.V(6).Info("new claim", "claimDump", obj) - ec.enqueueResourceClaim(logger, obj, false) + ec.enqueueResourceClaim(logger, nil, obj) }, UpdateFunc: func(old, updated interface{}) { logger.V(6).Info("updated claim", "claimDump", updated) - ec.enqueueResourceClaim(logger, updated, false) + ec.enqueueResourceClaim(logger, old, updated) }, DeleteFunc: func(obj interface{}) { logger.V(6).Info("deleted claim", "claimDump", obj) - ec.enqueueResourceClaim(logger, obj, true) + ec.enqueueResourceClaim(logger, obj, nil) }, }); err != nil { return nil, err @@ -326,15 +326,48 @@ func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) { return false, "nothing to do" } -func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) { - if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = d.Obj +func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj interface{}) { + deleted := newObj != nil + if d, ok := oldObj.(cache.DeletedFinalStateUnknown); ok { + oldObj = d.Obj } - claim, ok := obj.(*resourceapi.ResourceClaim) - if !ok { + oldClaim, ok := oldObj.(*resourceapi.ResourceClaim) + if oldObj != nil && !ok { + return + } + newClaim, ok := newObj.(*resourceapi.ResourceClaim) + if newObj != nil && !ok { return } + // Maintain metrics based on what was observed. + switch { + case oldClaim == nil: + // Added. + metrics.NumResourceClaims.Inc() + if newClaim.Status.Allocation != nil { + metrics.NumAllocatedResourceClaims.Inc() + } + case newClaim == nil: + // Deleted. + metrics.NumResourceClaims.Dec() + if oldClaim.Status.Allocation != nil { + metrics.NumAllocatedResourceClaims.Dec() + } + default: + // Updated. + switch { + case oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil: + metrics.NumAllocatedResourceClaims.Inc() + case oldClaim.Status.Allocation != nil && newClaim.Status.Allocation == nil: + metrics.NumAllocatedResourceClaims.Dec() + } + } + + claim := newClaim + if claim == nil { + claim = oldClaim + } if !deleted { // When starting up, we have to check all claims to find those with // stale pods in ReservedFor. During an update, a pod might get added diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 28eb2dc3915..e76497dcfee 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -17,13 +17,14 @@ limitations under the License. package resourceclaim import ( - "context" "errors" "fmt" "sort" "sync" "testing" + "time" + "github.com/onsi/gomega" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -38,7 +39,8 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" - ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/kubernetes/test/utils/ktesting" ) var ( @@ -79,10 +81,6 @@ var ( }() ) -func init() { - klog.InitFlags(nil) -} - func TestSyncHandler(t *testing.T) { tests := []struct { name string @@ -366,8 +364,8 @@ func TestSyncHandler(t *testing.T) { for _, tc := range tests { // Run sequentially because of global logging and global metrics. t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + tCtx := ktesting.Init(t) + tCtx = ktesting.WithCancel(tCtx) var objects []runtime.Object for _, pod := range tc.pods { @@ -392,19 +390,19 @@ func TestSyncHandler(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates() - ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer) + ec, err := NewController(klog.FromContext(tCtx), fakeKubeClient, podInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } // Ensure informers are up-to-date. - informerFactory.Start(ctx.Done()) + informerFactory.Start(tCtx.Done()) stopInformers := func() { - cancel() + tCtx.Cancel("stopping informers") informerFactory.Shutdown() } defer stopInformers() - informerFactory.WaitForCacheSync(ctx.Done()) + informerFactory.WaitForCacheSync(tCtx.Done()) // Add claims that only exist in the mutation cache. for _, claim := range tc.claimsInCache { @@ -414,13 +412,13 @@ func TestSyncHandler(t *testing.T) { // Simulate race: stop informers, add more pods that the controller doesn't know about. stopInformers() for _, pod := range tc.podsLater { - _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(tCtx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error while creating pod: %v", err) } } - err = ec.syncHandler(ctx, tc.key) + err = ec.syncHandler(tCtx, tc.key) if err != nil && !tc.expectedError { t.Fatalf("unexpected error while running handler: %v", err) } @@ -428,13 +426,13 @@ func TestSyncHandler(t *testing.T) { t.Fatalf("unexpected success") } - claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(ctx, metav1.ListOptions{}) + claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(tCtx, metav1.ListOptions{}) if err != nil { t.Fatalf("unexpected error while listing claims: %v", err) } assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items)) - pods, err := fakeKubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) + pods, err := fakeKubeClient.CoreV1().Pods("").List(tCtx, metav1.ListOptions{}) if err != nil { t.Fatalf("unexpected error while listing pods: %v", err) } @@ -455,6 +453,95 @@ func TestSyncHandler(t *testing.T) { } } +func TestResourceClaimEventHandler(t *testing.T) { + tCtx := ktesting.Init(t) + tCtx = ktesting.WithCancel(tCtx) + + fakeKubeClient := createTestClient() + setupMetrics() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() + templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates() + claimClient := fakeKubeClient.ResourceV1alpha3().ResourceClaims(testNamespace) + + _, err := NewController(tCtx.Logger(), fakeKubeClient, podInformer, claimInformer, templateInformer) + tCtx.ExpectNoError(err, "creating ephemeral controller") + + informerFactory.Start(tCtx.Done()) + stopInformers := func() { + tCtx.Cancel("stopping informers") + informerFactory.Shutdown() + } + defer stopInformers() + + var em numMetrics + + _, err = claimClient.Create(tCtx, testClaim, metav1.CreateOptions{}) + em.claims++ + ktesting.Step(tCtx, "create claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + modifiedClaim := testClaim.DeepCopy() + modifiedClaim.Labels = map[string]string{"foo": "bar"} + _, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{}) + ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Consistently(tCtx) + }) + + _, err = claimClient.Update(tCtx, testClaimAllocated, metav1.UpdateOptions{}) + em.allocated++ + ktesting.Step(tCtx, "allocate claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + modifiedClaim = testClaimAllocated.DeepCopy() + modifiedClaim.Labels = map[string]string{"foo": "bar2"} + _, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{}) + ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Consistently(tCtx) + }) + + otherClaimAllocated := testClaimAllocated.DeepCopy() + otherClaimAllocated.Name += "2" + _, err = claimClient.Create(tCtx, otherClaimAllocated, metav1.CreateOptions{}) + em.claims++ + em.allocated++ + ktesting.Step(tCtx, "create allocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + _, err = claimClient.Update(tCtx, testClaim, metav1.UpdateOptions{}) + em.allocated-- + ktesting.Step(tCtx, "deallocate claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + err = claimClient.Delete(tCtx, testClaim.Name, metav1.DeleteOptions{}) + em.claims-- + ktesting.Step(tCtx, "delete deallocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + err = claimClient.Delete(tCtx, otherClaimAllocated.Name, metav1.DeleteOptions{}) + em.claims-- + em.allocated-- + ktesting.Step(tCtx, "delete allocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + em.Consistently(tCtx) +} + func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourceapi.ResourceClaim { claim := &resourceapi.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, @@ -596,6 +683,34 @@ func createResourceClaimReactor() func(action k8stesting.Action) (handled bool, // Metrics helpers +type numMetrics struct { + claims float64 + allocated float64 +} + +func getNumMetric() (em numMetrics, err error) { + em.claims, err = testutil.GetGaugeMetricValue(metrics.NumResourceClaims) + if err != nil { + return + } + em.allocated, err = testutil.GetGaugeMetricValue(metrics.NumAllocatedResourceClaims) + return +} + +func (em numMetrics) Eventually(tCtx ktesting.TContext) { + g := gomega.NewWithT(tCtx) + tCtx.Helper() + + g.Eventually(getNumMetric).WithTimeout(5 * time.Second).Should(gomega.Equal(em)) +} + +func (em numMetrics) Consistently(tCtx ktesting.TContext) { + g := gomega.NewWithT(tCtx) + tCtx.Helper() + + g.Consistently(getNumMetric).WithTimeout(time.Second).Should(gomega.Equal(em)) +} + type expectedMetrics struct { numCreated int numFailures int @@ -604,12 +719,12 @@ type expectedMetrics struct { func expectMetrics(t *testing.T, em expectedMetrics) { t.Helper() - actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts) + actualCreated, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateAttempts) handleErr(t, err, "ResourceClaimCreate") if actualCreated != float64(em.numCreated) { t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated) } - actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures) + actualConflicts, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateFailures) handleErr(t, err, "ResourceClaimCreate/Conflict") if actualConflicts != float64(em.numFailures) { t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts) @@ -623,7 +738,9 @@ func handleErr(t *testing.T, err error, metricName string) { } func setupMetrics() { - ephemeralvolumemetrics.RegisterMetrics() - ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset() - ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset() + metrics.RegisterMetrics() + metrics.ResourceClaimCreateAttempts.Reset() + metrics.ResourceClaimCreateFailures.Reset() + metrics.NumResourceClaims.Set(0) + metrics.NumAllocatedResourceClaims.Set(0) } diff --git a/pkg/controller/resourceclaim/metrics/metrics.go b/pkg/controller/resourceclaim/metrics/metrics.go index 17c5496cd15..c47778a54a3 100644 --- a/pkg/controller/resourceclaim/metrics/metrics.go +++ b/pkg/controller/resourceclaim/metrics/metrics.go @@ -45,6 +45,22 @@ var ( Help: "Number of ResourceClaims creation request failures", StabilityLevel: metrics.ALPHA, }) + // NumResourceClaims tracks the current number of ResourceClaims. + NumResourceClaims = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: ResourceClaimSubsystem, + Name: "resource_claims", + Help: "Number of ResourceClaims", + StabilityLevel: metrics.ALPHA, + }) + // NumAllocatedResourceClaims tracks the current number of allocated ResourceClaims. + NumAllocatedResourceClaims = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: ResourceClaimSubsystem, + Name: "allocated_resource_claims", + Help: "Number of allocated ResourceClaims", + StabilityLevel: metrics.ALPHA, + }) ) var registerMetrics sync.Once @@ -54,5 +70,7 @@ func RegisterMetrics() { registerMetrics.Do(func() { legacyregistry.MustRegister(ResourceClaimCreateAttempts) legacyregistry.MustRegister(ResourceClaimCreateFailures) + legacyregistry.MustRegister(NumResourceClaims) + legacyregistry.MustRegister(NumAllocatedResourceClaims) }) }