diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 857b751bed0..e2f3aa3dbbc 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -157,15 +157,15 @@ func NewController( if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { logger.V(6).Info("new claim", "claimDump", obj) - ec.enqueueResourceClaim(logger, obj, false) + ec.enqueueResourceClaim(logger, nil, obj) }, UpdateFunc: func(old, updated interface{}) { logger.V(6).Info("updated claim", "claimDump", updated) - ec.enqueueResourceClaim(logger, updated, false) + ec.enqueueResourceClaim(logger, old, updated) }, DeleteFunc: func(obj interface{}) { logger.V(6).Info("deleted claim", "claimDump", obj) - ec.enqueueResourceClaim(logger, obj, true) + ec.enqueueResourceClaim(logger, obj, nil) }, }); err != nil { return nil, err @@ -326,15 +326,48 @@ func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) { return false, "nothing to do" } -func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) { - if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = d.Obj +func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj interface{}) { + deleted := newObj != nil + if d, ok := oldObj.(cache.DeletedFinalStateUnknown); ok { + oldObj = d.Obj } - claim, ok := obj.(*resourceapi.ResourceClaim) - if !ok { + oldClaim, ok := oldObj.(*resourceapi.ResourceClaim) + if oldObj != nil && !ok { + return + } + newClaim, ok := newObj.(*resourceapi.ResourceClaim) + if newObj != nil && !ok { return } + // Maintain metrics based on what was observed. + switch { + case oldClaim == nil: + // Added. + metrics.NumResourceClaims.Inc() + if newClaim.Status.Allocation != nil { + metrics.NumAllocatedResourceClaims.Inc() + } + case newClaim == nil: + // Deleted. + metrics.NumResourceClaims.Dec() + if oldClaim.Status.Allocation != nil { + metrics.NumAllocatedResourceClaims.Dec() + } + default: + // Updated. + switch { + case oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil: + metrics.NumAllocatedResourceClaims.Inc() + case oldClaim.Status.Allocation != nil && newClaim.Status.Allocation == nil: + metrics.NumAllocatedResourceClaims.Dec() + } + } + + claim := newClaim + if claim == nil { + claim = oldClaim + } if !deleted { // When starting up, we have to check all claims to find those with // stale pods in ReservedFor. During an update, a pod might get added diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 28eb2dc3915..e76497dcfee 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -17,13 +17,14 @@ limitations under the License. package resourceclaim import ( - "context" "errors" "fmt" "sort" "sync" "testing" + "time" + "github.com/onsi/gomega" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -38,7 +39,8 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" - ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" + "k8s.io/kubernetes/test/utils/ktesting" ) var ( @@ -79,10 +81,6 @@ var ( }() ) -func init() { - klog.InitFlags(nil) -} - func TestSyncHandler(t *testing.T) { tests := []struct { name string @@ -366,8 +364,8 @@ func TestSyncHandler(t *testing.T) { for _, tc := range tests { // Run sequentially because of global logging and global metrics. t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + tCtx := ktesting.Init(t) + tCtx = ktesting.WithCancel(tCtx) var objects []runtime.Object for _, pod := range tc.pods { @@ -392,19 +390,19 @@ func TestSyncHandler(t *testing.T) { claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates() - ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer) + ec, err := NewController(klog.FromContext(tCtx), fakeKubeClient, podInformer, claimInformer, templateInformer) if err != nil { t.Fatalf("error creating ephemeral controller : %v", err) } // Ensure informers are up-to-date. - informerFactory.Start(ctx.Done()) + informerFactory.Start(tCtx.Done()) stopInformers := func() { - cancel() + tCtx.Cancel("stopping informers") informerFactory.Shutdown() } defer stopInformers() - informerFactory.WaitForCacheSync(ctx.Done()) + informerFactory.WaitForCacheSync(tCtx.Done()) // Add claims that only exist in the mutation cache. for _, claim := range tc.claimsInCache { @@ -414,13 +412,13 @@ func TestSyncHandler(t *testing.T) { // Simulate race: stop informers, add more pods that the controller doesn't know about. stopInformers() for _, pod := range tc.podsLater { - _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(tCtx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("unexpected error while creating pod: %v", err) } } - err = ec.syncHandler(ctx, tc.key) + err = ec.syncHandler(tCtx, tc.key) if err != nil && !tc.expectedError { t.Fatalf("unexpected error while running handler: %v", err) } @@ -428,13 +426,13 @@ func TestSyncHandler(t *testing.T) { t.Fatalf("unexpected success") } - claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(ctx, metav1.ListOptions{}) + claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(tCtx, metav1.ListOptions{}) if err != nil { t.Fatalf("unexpected error while listing claims: %v", err) } assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items)) - pods, err := fakeKubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) + pods, err := fakeKubeClient.CoreV1().Pods("").List(tCtx, metav1.ListOptions{}) if err != nil { t.Fatalf("unexpected error while listing pods: %v", err) } @@ -455,6 +453,95 @@ func TestSyncHandler(t *testing.T) { } } +func TestResourceClaimEventHandler(t *testing.T) { + tCtx := ktesting.Init(t) + tCtx = ktesting.WithCancel(tCtx) + + fakeKubeClient := createTestClient() + setupMetrics() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() + templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates() + claimClient := fakeKubeClient.ResourceV1alpha3().ResourceClaims(testNamespace) + + _, err := NewController(tCtx.Logger(), fakeKubeClient, podInformer, claimInformer, templateInformer) + tCtx.ExpectNoError(err, "creating ephemeral controller") + + informerFactory.Start(tCtx.Done()) + stopInformers := func() { + tCtx.Cancel("stopping informers") + informerFactory.Shutdown() + } + defer stopInformers() + + var em numMetrics + + _, err = claimClient.Create(tCtx, testClaim, metav1.CreateOptions{}) + em.claims++ + ktesting.Step(tCtx, "create claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + modifiedClaim := testClaim.DeepCopy() + modifiedClaim.Labels = map[string]string{"foo": "bar"} + _, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{}) + ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Consistently(tCtx) + }) + + _, err = claimClient.Update(tCtx, testClaimAllocated, metav1.UpdateOptions{}) + em.allocated++ + ktesting.Step(tCtx, "allocate claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + modifiedClaim = testClaimAllocated.DeepCopy() + modifiedClaim.Labels = map[string]string{"foo": "bar2"} + _, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{}) + ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Consistently(tCtx) + }) + + otherClaimAllocated := testClaimAllocated.DeepCopy() + otherClaimAllocated.Name += "2" + _, err = claimClient.Create(tCtx, otherClaimAllocated, metav1.CreateOptions{}) + em.claims++ + em.allocated++ + ktesting.Step(tCtx, "create allocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + _, err = claimClient.Update(tCtx, testClaim, metav1.UpdateOptions{}) + em.allocated-- + ktesting.Step(tCtx, "deallocate claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + err = claimClient.Delete(tCtx, testClaim.Name, metav1.DeleteOptions{}) + em.claims-- + ktesting.Step(tCtx, "delete deallocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + err = claimClient.Delete(tCtx, otherClaimAllocated.Name, metav1.DeleteOptions{}) + em.claims-- + em.allocated-- + ktesting.Step(tCtx, "delete allocated claim", func(tCtx ktesting.TContext) { + tCtx.ExpectNoError(err) + em.Eventually(tCtx) + }) + + em.Consistently(tCtx) +} + func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourceapi.ResourceClaim { claim := &resourceapi.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, @@ -596,6 +683,34 @@ func createResourceClaimReactor() func(action k8stesting.Action) (handled bool, // Metrics helpers +type numMetrics struct { + claims float64 + allocated float64 +} + +func getNumMetric() (em numMetrics, err error) { + em.claims, err = testutil.GetGaugeMetricValue(metrics.NumResourceClaims) + if err != nil { + return + } + em.allocated, err = testutil.GetGaugeMetricValue(metrics.NumAllocatedResourceClaims) + return +} + +func (em numMetrics) Eventually(tCtx ktesting.TContext) { + g := gomega.NewWithT(tCtx) + tCtx.Helper() + + g.Eventually(getNumMetric).WithTimeout(5 * time.Second).Should(gomega.Equal(em)) +} + +func (em numMetrics) Consistently(tCtx ktesting.TContext) { + g := gomega.NewWithT(tCtx) + tCtx.Helper() + + g.Consistently(getNumMetric).WithTimeout(time.Second).Should(gomega.Equal(em)) +} + type expectedMetrics struct { numCreated int numFailures int @@ -604,12 +719,12 @@ type expectedMetrics struct { func expectMetrics(t *testing.T, em expectedMetrics) { t.Helper() - actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts) + actualCreated, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateAttempts) handleErr(t, err, "ResourceClaimCreate") if actualCreated != float64(em.numCreated) { t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated) } - actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures) + actualConflicts, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateFailures) handleErr(t, err, "ResourceClaimCreate/Conflict") if actualConflicts != float64(em.numFailures) { t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts) @@ -623,7 +738,9 @@ func handleErr(t *testing.T, err error, metricName string) { } func setupMetrics() { - ephemeralvolumemetrics.RegisterMetrics() - ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset() - ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset() + metrics.RegisterMetrics() + metrics.ResourceClaimCreateAttempts.Reset() + metrics.ResourceClaimCreateFailures.Reset() + metrics.NumResourceClaims.Set(0) + metrics.NumAllocatedResourceClaims.Set(0) } diff --git a/pkg/controller/resourceclaim/metrics/metrics.go b/pkg/controller/resourceclaim/metrics/metrics.go index 17c5496cd15..c47778a54a3 100644 --- a/pkg/controller/resourceclaim/metrics/metrics.go +++ b/pkg/controller/resourceclaim/metrics/metrics.go @@ -45,6 +45,22 @@ var ( Help: "Number of ResourceClaims creation request failures", StabilityLevel: metrics.ALPHA, }) + // NumResourceClaims tracks the current number of ResourceClaims. + NumResourceClaims = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: ResourceClaimSubsystem, + Name: "resource_claims", + Help: "Number of ResourceClaims", + StabilityLevel: metrics.ALPHA, + }) + // NumAllocatedResourceClaims tracks the current number of allocated ResourceClaims. + NumAllocatedResourceClaims = metrics.NewGauge( + &metrics.GaugeOpts{ + Subsystem: ResourceClaimSubsystem, + Name: "allocated_resource_claims", + Help: "Number of allocated ResourceClaims", + StabilityLevel: metrics.ALPHA, + }) ) var registerMetrics sync.Once @@ -54,5 +70,7 @@ func RegisterMetrics() { registerMetrics.Do(func() { legacyregistry.MustRegister(ResourceClaimCreateAttempts) legacyregistry.MustRegister(ResourceClaimCreateFailures) + legacyregistry.MustRegister(NumResourceClaims) + legacyregistry.MustRegister(NumAllocatedResourceClaims) }) }