DRA resourceclaims: maintain metric of total and allocated claims

These metrics can provide insights into ResourceClaim usage. The total count is
redundant because the apiserver also provides count of resources, but having it
in the same sub-system next to the count of allocated claims might be more
discoverable and helps monitor the controller itself.
This commit is contained in:
Patrick Ohly 2024-09-26 14:43:12 +02:00
parent 98e5a701cb
commit c2524cbf9b
3 changed files with 197 additions and 29 deletions

View File

@ -157,15 +157,15 @@ func NewController(
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
logger.V(6).Info("new claim", "claimDump", obj) logger.V(6).Info("new claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, false) ec.enqueueResourceClaim(logger, nil, obj)
}, },
UpdateFunc: func(old, updated interface{}) { UpdateFunc: func(old, updated interface{}) {
logger.V(6).Info("updated claim", "claimDump", updated) logger.V(6).Info("updated claim", "claimDump", updated)
ec.enqueueResourceClaim(logger, updated, false) ec.enqueueResourceClaim(logger, old, updated)
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
logger.V(6).Info("deleted claim", "claimDump", obj) logger.V(6).Info("deleted claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, true) ec.enqueueResourceClaim(logger, obj, nil)
}, },
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -326,15 +326,48 @@ func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
return false, "nothing to do" return false, "nothing to do"
} }
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) { func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj interface{}) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { deleted := newObj != nil
obj = d.Obj if d, ok := oldObj.(cache.DeletedFinalStateUnknown); ok {
oldObj = d.Obj
} }
claim, ok := obj.(*resourceapi.ResourceClaim) oldClaim, ok := oldObj.(*resourceapi.ResourceClaim)
if !ok { if oldObj != nil && !ok {
return
}
newClaim, ok := newObj.(*resourceapi.ResourceClaim)
if newObj != nil && !ok {
return return
} }
// Maintain metrics based on what was observed.
switch {
case oldClaim == nil:
// Added.
metrics.NumResourceClaims.Inc()
if newClaim.Status.Allocation != nil {
metrics.NumAllocatedResourceClaims.Inc()
}
case newClaim == nil:
// Deleted.
metrics.NumResourceClaims.Dec()
if oldClaim.Status.Allocation != nil {
metrics.NumAllocatedResourceClaims.Dec()
}
default:
// Updated.
switch {
case oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil:
metrics.NumAllocatedResourceClaims.Inc()
case oldClaim.Status.Allocation != nil && newClaim.Status.Allocation == nil:
metrics.NumAllocatedResourceClaims.Dec()
}
}
claim := newClaim
if claim == nil {
claim = oldClaim
}
if !deleted { if !deleted {
// When starting up, we have to check all claims to find those with // When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added // stale pods in ReservedFor. During an update, a pod might get added

View File

@ -17,13 +17,14 @@ limitations under the License.
package resourceclaim package resourceclaim
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"testing" "testing"
"time"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -38,7 +39,8 @@ import (
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/kubernetes/test/utils/ktesting"
) )
var ( var (
@ -79,10 +81,6 @@ var (
}() }()
) )
func init() {
klog.InitFlags(nil)
}
func TestSyncHandler(t *testing.T) { func TestSyncHandler(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -366,8 +364,8 @@ func TestSyncHandler(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
// Run sequentially because of global logging and global metrics. // Run sequentially because of global logging and global metrics.
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) tCtx := ktesting.Init(t)
defer cancel() tCtx = ktesting.WithCancel(tCtx)
var objects []runtime.Object var objects []runtime.Object
for _, pod := range tc.pods { for _, pod := range tc.pods {
@ -392,19 +390,19 @@ func TestSyncHandler(t *testing.T) {
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates() templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates()
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer) ec, err := NewController(klog.FromContext(tCtx), fakeKubeClient, podInformer, claimInformer, templateInformer)
if err != nil { if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err) t.Fatalf("error creating ephemeral controller : %v", err)
} }
// Ensure informers are up-to-date. // Ensure informers are up-to-date.
informerFactory.Start(ctx.Done()) informerFactory.Start(tCtx.Done())
stopInformers := func() { stopInformers := func() {
cancel() tCtx.Cancel("stopping informers")
informerFactory.Shutdown() informerFactory.Shutdown()
} }
defer stopInformers() defer stopInformers()
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(tCtx.Done())
// Add claims that only exist in the mutation cache. // Add claims that only exist in the mutation cache.
for _, claim := range tc.claimsInCache { for _, claim := range tc.claimsInCache {
@ -414,13 +412,13 @@ func TestSyncHandler(t *testing.T) {
// Simulate race: stop informers, add more pods that the controller doesn't know about. // Simulate race: stop informers, add more pods that the controller doesn't know about.
stopInformers() stopInformers()
for _, pod := range tc.podsLater { for _, pod := range tc.podsLater {
_, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) _, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error while creating pod: %v", err) t.Fatalf("unexpected error while creating pod: %v", err)
} }
} }
err = ec.syncHandler(ctx, tc.key) err = ec.syncHandler(tCtx, tc.key)
if err != nil && !tc.expectedError { if err != nil && !tc.expectedError {
t.Fatalf("unexpected error while running handler: %v", err) t.Fatalf("unexpected error while running handler: %v", err)
} }
@ -428,13 +426,13 @@ func TestSyncHandler(t *testing.T) {
t.Fatalf("unexpected success") t.Fatalf("unexpected success")
} }
claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(ctx, metav1.ListOptions{}) claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(tCtx, metav1.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error while listing claims: %v", err) t.Fatalf("unexpected error while listing claims: %v", err)
} }
assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items)) assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items))
pods, err := fakeKubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) pods, err := fakeKubeClient.CoreV1().Pods("").List(tCtx, metav1.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("unexpected error while listing pods: %v", err) t.Fatalf("unexpected error while listing pods: %v", err)
} }
@ -455,6 +453,95 @@ func TestSyncHandler(t *testing.T) {
} }
} }
func TestResourceClaimEventHandler(t *testing.T) {
tCtx := ktesting.Init(t)
tCtx = ktesting.WithCancel(tCtx)
fakeKubeClient := createTestClient()
setupMetrics()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates()
claimClient := fakeKubeClient.ResourceV1alpha3().ResourceClaims(testNamespace)
_, err := NewController(tCtx.Logger(), fakeKubeClient, podInformer, claimInformer, templateInformer)
tCtx.ExpectNoError(err, "creating ephemeral controller")
informerFactory.Start(tCtx.Done())
stopInformers := func() {
tCtx.Cancel("stopping informers")
informerFactory.Shutdown()
}
defer stopInformers()
var em numMetrics
_, err = claimClient.Create(tCtx, testClaim, metav1.CreateOptions{})
em.claims++
ktesting.Step(tCtx, "create claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
modifiedClaim := testClaim.DeepCopy()
modifiedClaim.Labels = map[string]string{"foo": "bar"}
_, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{})
ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Consistently(tCtx)
})
_, err = claimClient.Update(tCtx, testClaimAllocated, metav1.UpdateOptions{})
em.allocated++
ktesting.Step(tCtx, "allocate claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
modifiedClaim = testClaimAllocated.DeepCopy()
modifiedClaim.Labels = map[string]string{"foo": "bar2"}
_, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{})
ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Consistently(tCtx)
})
otherClaimAllocated := testClaimAllocated.DeepCopy()
otherClaimAllocated.Name += "2"
_, err = claimClient.Create(tCtx, otherClaimAllocated, metav1.CreateOptions{})
em.claims++
em.allocated++
ktesting.Step(tCtx, "create allocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
_, err = claimClient.Update(tCtx, testClaim, metav1.UpdateOptions{})
em.allocated--
ktesting.Step(tCtx, "deallocate claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
err = claimClient.Delete(tCtx, testClaim.Name, metav1.DeleteOptions{})
em.claims--
ktesting.Step(tCtx, "delete deallocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
err = claimClient.Delete(tCtx, otherClaimAllocated.Name, metav1.DeleteOptions{})
em.claims--
em.allocated--
ktesting.Step(tCtx, "delete allocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
em.Consistently(tCtx)
}
func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourceapi.ResourceClaim { func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourceapi.ResourceClaim {
claim := &resourceapi.ResourceClaim{ claim := &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
@ -596,6 +683,34 @@ func createResourceClaimReactor() func(action k8stesting.Action) (handled bool,
// Metrics helpers // Metrics helpers
type numMetrics struct {
claims float64
allocated float64
}
func getNumMetric() (em numMetrics, err error) {
em.claims, err = testutil.GetGaugeMetricValue(metrics.NumResourceClaims)
if err != nil {
return
}
em.allocated, err = testutil.GetGaugeMetricValue(metrics.NumAllocatedResourceClaims)
return
}
func (em numMetrics) Eventually(tCtx ktesting.TContext) {
g := gomega.NewWithT(tCtx)
tCtx.Helper()
g.Eventually(getNumMetric).WithTimeout(5 * time.Second).Should(gomega.Equal(em))
}
func (em numMetrics) Consistently(tCtx ktesting.TContext) {
g := gomega.NewWithT(tCtx)
tCtx.Helper()
g.Consistently(getNumMetric).WithTimeout(time.Second).Should(gomega.Equal(em))
}
type expectedMetrics struct { type expectedMetrics struct {
numCreated int numCreated int
numFailures int numFailures int
@ -604,12 +719,12 @@ type expectedMetrics struct {
func expectMetrics(t *testing.T, em expectedMetrics) { func expectMetrics(t *testing.T, em expectedMetrics) {
t.Helper() t.Helper()
actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts) actualCreated, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateAttempts)
handleErr(t, err, "ResourceClaimCreate") handleErr(t, err, "ResourceClaimCreate")
if actualCreated != float64(em.numCreated) { if actualCreated != float64(em.numCreated) {
t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated) t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated)
} }
actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures) actualConflicts, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateFailures)
handleErr(t, err, "ResourceClaimCreate/Conflict") handleErr(t, err, "ResourceClaimCreate/Conflict")
if actualConflicts != float64(em.numFailures) { if actualConflicts != float64(em.numFailures) {
t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts) t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts)
@ -623,7 +738,9 @@ func handleErr(t *testing.T, err error, metricName string) {
} }
func setupMetrics() { func setupMetrics() {
ephemeralvolumemetrics.RegisterMetrics() metrics.RegisterMetrics()
ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset() metrics.ResourceClaimCreateAttempts.Reset()
ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset() metrics.ResourceClaimCreateFailures.Reset()
metrics.NumResourceClaims.Set(0)
metrics.NumAllocatedResourceClaims.Set(0)
} }

View File

@ -45,6 +45,22 @@ var (
Help: "Number of ResourceClaims creation request failures", Help: "Number of ResourceClaims creation request failures",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}) })
// NumResourceClaims tracks the current number of ResourceClaims.
NumResourceClaims = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: ResourceClaimSubsystem,
Name: "resource_claims",
Help: "Number of ResourceClaims",
StabilityLevel: metrics.ALPHA,
})
// NumAllocatedResourceClaims tracks the current number of allocated ResourceClaims.
NumAllocatedResourceClaims = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: ResourceClaimSubsystem,
Name: "allocated_resource_claims",
Help: "Number of allocated ResourceClaims",
StabilityLevel: metrics.ALPHA,
})
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -54,5 +70,7 @@ func RegisterMetrics() {
registerMetrics.Do(func() { registerMetrics.Do(func() {
legacyregistry.MustRegister(ResourceClaimCreateAttempts) legacyregistry.MustRegister(ResourceClaimCreateAttempts)
legacyregistry.MustRegister(ResourceClaimCreateFailures) legacyregistry.MustRegister(ResourceClaimCreateFailures)
legacyregistry.MustRegister(NumResourceClaims)
legacyregistry.MustRegister(NumAllocatedResourceClaims)
}) })
} }