Merge pull request #127661 from pohly/dra-resourceclaim-metrics

DRA resourceclaims: maintain metric of total and allocated claims
This commit is contained in:
Kubernetes Prow Robot 2024-10-28 21:12:53 +00:00 committed by GitHub
commit 8aae9aabf3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 197 additions and 29 deletions

View File

@ -157,15 +157,15 @@ func NewController(
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger.V(6).Info("new claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, false)
ec.enqueueResourceClaim(logger, nil, obj)
},
UpdateFunc: func(old, updated interface{}) {
logger.V(6).Info("updated claim", "claimDump", updated)
ec.enqueueResourceClaim(logger, updated, false)
ec.enqueueResourceClaim(logger, old, updated)
},
DeleteFunc: func(obj interface{}) {
logger.V(6).Info("deleted claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, true)
ec.enqueueResourceClaim(logger, obj, nil)
},
}); err != nil {
return nil, err
@ -326,15 +326,48 @@ func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
return false, "nothing to do"
}
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, oldObj, newObj interface{}) {
deleted := newObj != nil
if d, ok := oldObj.(cache.DeletedFinalStateUnknown); ok {
oldObj = d.Obj
}
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
oldClaim, ok := oldObj.(*resourceapi.ResourceClaim)
if oldObj != nil && !ok {
return
}
newClaim, ok := newObj.(*resourceapi.ResourceClaim)
if newObj != nil && !ok {
return
}
// Maintain metrics based on what was observed.
switch {
case oldClaim == nil:
// Added.
metrics.NumResourceClaims.Inc()
if newClaim.Status.Allocation != nil {
metrics.NumAllocatedResourceClaims.Inc()
}
case newClaim == nil:
// Deleted.
metrics.NumResourceClaims.Dec()
if oldClaim.Status.Allocation != nil {
metrics.NumAllocatedResourceClaims.Dec()
}
default:
// Updated.
switch {
case oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil:
metrics.NumAllocatedResourceClaims.Inc()
case oldClaim.Status.Allocation != nil && newClaim.Status.Allocation == nil:
metrics.NumAllocatedResourceClaims.Dec()
}
}
claim := newClaim
if claim == nil {
claim = oldClaim
}
if !deleted {
// When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added

View File

@ -17,13 +17,14 @@ limitations under the License.
package resourceclaim
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
@ -38,7 +39,8 @@ import (
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/kubernetes/test/utils/ktesting"
)
var (
@ -79,10 +81,6 @@ var (
}()
)
func init() {
klog.InitFlags(nil)
}
func TestSyncHandler(t *testing.T) {
tests := []struct {
name string
@ -366,8 +364,8 @@ func TestSyncHandler(t *testing.T) {
for _, tc := range tests {
// Run sequentially because of global logging and global metrics.
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tCtx := ktesting.Init(t)
tCtx = ktesting.WithCancel(tCtx)
var objects []runtime.Object
for _, pod := range tc.pods {
@ -392,19 +390,19 @@ func TestSyncHandler(t *testing.T) {
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates()
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer)
ec, err := NewController(klog.FromContext(tCtx), fakeKubeClient, podInformer, claimInformer, templateInformer)
if err != nil {
t.Fatalf("error creating ephemeral controller : %v", err)
}
// Ensure informers are up-to-date.
informerFactory.Start(ctx.Done())
informerFactory.Start(tCtx.Done())
stopInformers := func() {
cancel()
tCtx.Cancel("stopping informers")
informerFactory.Shutdown()
}
defer stopInformers()
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
// Add claims that only exist in the mutation cache.
for _, claim := range tc.claimsInCache {
@ -414,13 +412,13 @@ func TestSyncHandler(t *testing.T) {
// Simulate race: stop informers, add more pods that the controller doesn't know about.
stopInformers()
for _, pod := range tc.podsLater {
_, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
_, err := fakeKubeClient.CoreV1().Pods(pod.Namespace).Create(tCtx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error while creating pod: %v", err)
}
}
err = ec.syncHandler(ctx, tc.key)
err = ec.syncHandler(tCtx, tc.key)
if err != nil && !tc.expectedError {
t.Fatalf("unexpected error while running handler: %v", err)
}
@ -428,13 +426,13 @@ func TestSyncHandler(t *testing.T) {
t.Fatalf("unexpected success")
}
claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(ctx, metav1.ListOptions{})
claims, err := fakeKubeClient.ResourceV1alpha3().ResourceClaims("").List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error while listing claims: %v", err)
}
assert.Equal(t, normalizeClaims(tc.expectedClaims), normalizeClaims(claims.Items))
pods, err := fakeKubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
pods, err := fakeKubeClient.CoreV1().Pods("").List(tCtx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error while listing pods: %v", err)
}
@ -455,6 +453,95 @@ func TestSyncHandler(t *testing.T) {
}
}
func TestResourceClaimEventHandler(t *testing.T) {
tCtx := ktesting.Init(t)
tCtx = ktesting.WithCancel(tCtx)
fakeKubeClient := createTestClient()
setupMetrics()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
templateInformer := informerFactory.Resource().V1alpha3().ResourceClaimTemplates()
claimClient := fakeKubeClient.ResourceV1alpha3().ResourceClaims(testNamespace)
_, err := NewController(tCtx.Logger(), fakeKubeClient, podInformer, claimInformer, templateInformer)
tCtx.ExpectNoError(err, "creating ephemeral controller")
informerFactory.Start(tCtx.Done())
stopInformers := func() {
tCtx.Cancel("stopping informers")
informerFactory.Shutdown()
}
defer stopInformers()
var em numMetrics
_, err = claimClient.Create(tCtx, testClaim, metav1.CreateOptions{})
em.claims++
ktesting.Step(tCtx, "create claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
modifiedClaim := testClaim.DeepCopy()
modifiedClaim.Labels = map[string]string{"foo": "bar"}
_, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{})
ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Consistently(tCtx)
})
_, err = claimClient.Update(tCtx, testClaimAllocated, metav1.UpdateOptions{})
em.allocated++
ktesting.Step(tCtx, "allocate claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
modifiedClaim = testClaimAllocated.DeepCopy()
modifiedClaim.Labels = map[string]string{"foo": "bar2"}
_, err = claimClient.Update(tCtx, modifiedClaim, metav1.UpdateOptions{})
ktesting.Step(tCtx, "modify claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Consistently(tCtx)
})
otherClaimAllocated := testClaimAllocated.DeepCopy()
otherClaimAllocated.Name += "2"
_, err = claimClient.Create(tCtx, otherClaimAllocated, metav1.CreateOptions{})
em.claims++
em.allocated++
ktesting.Step(tCtx, "create allocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
_, err = claimClient.Update(tCtx, testClaim, metav1.UpdateOptions{})
em.allocated--
ktesting.Step(tCtx, "deallocate claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
err = claimClient.Delete(tCtx, testClaim.Name, metav1.DeleteOptions{})
em.claims--
ktesting.Step(tCtx, "delete deallocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
err = claimClient.Delete(tCtx, otherClaimAllocated.Name, metav1.DeleteOptions{})
em.claims--
em.allocated--
ktesting.Step(tCtx, "delete allocated claim", func(tCtx ktesting.TContext) {
tCtx.ExpectNoError(err)
em.Eventually(tCtx)
})
em.Consistently(tCtx)
}
func makeClaim(name, namespace, classname string, owner *metav1.OwnerReference) *resourceapi.ResourceClaim {
claim := &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
@ -596,6 +683,34 @@ func createResourceClaimReactor() func(action k8stesting.Action) (handled bool,
// Metrics helpers
type numMetrics struct {
claims float64
allocated float64
}
func getNumMetric() (em numMetrics, err error) {
em.claims, err = testutil.GetGaugeMetricValue(metrics.NumResourceClaims)
if err != nil {
return
}
em.allocated, err = testutil.GetGaugeMetricValue(metrics.NumAllocatedResourceClaims)
return
}
func (em numMetrics) Eventually(tCtx ktesting.TContext) {
g := gomega.NewWithT(tCtx)
tCtx.Helper()
g.Eventually(getNumMetric).WithTimeout(5 * time.Second).Should(gomega.Equal(em))
}
func (em numMetrics) Consistently(tCtx ktesting.TContext) {
g := gomega.NewWithT(tCtx)
tCtx.Helper()
g.Consistently(getNumMetric).WithTimeout(time.Second).Should(gomega.Equal(em))
}
type expectedMetrics struct {
numCreated int
numFailures int
@ -604,12 +719,12 @@ type expectedMetrics struct {
func expectMetrics(t *testing.T, em expectedMetrics) {
t.Helper()
actualCreated, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateAttempts)
actualCreated, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateAttempts)
handleErr(t, err, "ResourceClaimCreate")
if actualCreated != float64(em.numCreated) {
t.Errorf("Expected claims to be created %d, got %v", em.numCreated, actualCreated)
}
actualConflicts, err := testutil.GetCounterMetricValue(ephemeralvolumemetrics.ResourceClaimCreateFailures)
actualConflicts, err := testutil.GetCounterMetricValue(metrics.ResourceClaimCreateFailures)
handleErr(t, err, "ResourceClaimCreate/Conflict")
if actualConflicts != float64(em.numFailures) {
t.Errorf("Expected claims to have conflicts %d, got %v", em.numFailures, actualConflicts)
@ -623,7 +738,9 @@ func handleErr(t *testing.T, err error, metricName string) {
}
func setupMetrics() {
ephemeralvolumemetrics.RegisterMetrics()
ephemeralvolumemetrics.ResourceClaimCreateAttempts.Reset()
ephemeralvolumemetrics.ResourceClaimCreateFailures.Reset()
metrics.RegisterMetrics()
metrics.ResourceClaimCreateAttempts.Reset()
metrics.ResourceClaimCreateFailures.Reset()
metrics.NumResourceClaims.Set(0)
metrics.NumAllocatedResourceClaims.Set(0)
}

View File

@ -45,6 +45,22 @@ var (
Help: "Number of ResourceClaims creation request failures",
StabilityLevel: metrics.ALPHA,
})
// NumResourceClaims tracks the current number of ResourceClaims.
NumResourceClaims = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: ResourceClaimSubsystem,
Name: "resource_claims",
Help: "Number of ResourceClaims",
StabilityLevel: metrics.ALPHA,
})
// NumAllocatedResourceClaims tracks the current number of allocated ResourceClaims.
NumAllocatedResourceClaims = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: ResourceClaimSubsystem,
Name: "allocated_resource_claims",
Help: "Number of allocated ResourceClaims",
StabilityLevel: metrics.ALPHA,
})
)
var registerMetrics sync.Once
@ -54,5 +70,7 @@ func RegisterMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(ResourceClaimCreateAttempts)
legacyregistry.MustRegister(ResourceClaimCreateFailures)
legacyregistry.MustRegister(NumResourceClaims)
legacyregistry.MustRegister(NumAllocatedResourceClaims)
})
}