diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go index c595f55d64a..05d15b08d75 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go @@ -258,12 +258,8 @@ func GetHistogramVecFromGatherer(gatherer metrics.Gatherer, metricName string, l if err != nil { return nil, err } - for _, mFamily := range m { - if mFamily.GetName() == metricName { - metricFamily = mFamily - break - } - } + + metricFamily = findMetricFamily(m, metricName) if metricFamily == nil { return nil, fmt.Errorf("metric %q not found", metricName) @@ -433,3 +429,47 @@ func LabelsMatch(metric *dto.Metric, labelFilter map[string]string) bool { return true } + +// GetCounterVecFromGatherer collects a counter that matches the given name +// from a gatherer implementing k8s.io/component-base/metrics.Gatherer interface. +// It returns all counter values that had a label with a certain name in a map +// that uses the label value as keys. +// +// Used only for testing purposes where we need to gather metrics directly from a running binary (without metrics endpoint). +func GetCounterValuesFromGatherer(gatherer metrics.Gatherer, metricName string, lvMap map[string]string, labelName string) (map[string]float64, error) { + m, err := gatherer.Gather() + if err != nil { + return nil, err + } + + metricFamily := findMetricFamily(m, metricName) + if metricFamily == nil { + return nil, fmt.Errorf("metric %q not found", metricName) + } + if len(metricFamily.GetMetric()) == 0 { + return nil, fmt.Errorf("metric %q is empty", metricName) + } + + values := make(map[string]float64) + for _, metric := range metricFamily.GetMetric() { + if LabelsMatch(metric, lvMap) { + if counter := metric.GetCounter(); counter != nil { + for _, labelPair := range metric.Label { + if labelPair.GetName() == labelName { + values[labelPair.GetValue()] = counter.GetValue() + } + } + } + } + } + return values, nil +} + +func findMetricFamily(metricFamilies []*dto.MetricFamily, metricName string) *dto.MetricFamily { + for _, mFamily := range metricFamilies { + if mFamily.GetName() == metricName { + return mFamily + } + } + return nil +} diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go index 702fb7fcfde..adfc1999989 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "reflect" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -591,3 +592,104 @@ func TestGetHistogramVecFromGatherer(t *testing.T) { }) } } + +func TestGetCounterValuesFromGatherer(t *testing.T) { + namespace := "namespace" + subsystem := "subsystem" + name := "metric_test_name" + metricName := fmt.Sprintf("%s_%s_%s", namespace, subsystem, name) + + tests := map[string]struct { + metricName string // Empty is replaced with valid name. + lvMap map[string]string + labelName string + + wantCounterValues map[string]float64 + wantErr string + }{ + "wrong-metric": { + metricName: "no-such-metric", + wantErr: `metric "no-such-metric" not found`, + }, + + "none": { + metricName: metricName, + lvMap: map[string]string{"no-such-label": "a"}, + + wantCounterValues: map[string]float64{}, + }, + + "value1-0": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-0"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 1.5, "value2-1": 2.5}, + }, + + "value1-1": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-1"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 3.5, "value2-1": 4.5}, + }, + + "value1-1-value2-0-none": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-1", "label2": "value2-0"}, + labelName: "none", + + wantCounterValues: map[string]float64{}, + }, + + "value1-0-value2-0-one": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-0", "label2": "value2-0"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 1.5}, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + // CounterVec has two labels defined. + labels := []string{"label1", "label2"} + counterOpts := &metrics.CounterOpts{ + Namespace: "namespace", + Name: "metric_test_name", + Subsystem: "subsystem", + Help: "counter help message", + } + vec := metrics.NewCounterVec(counterOpts, labels) + // Use local registry + var registry = metrics.NewKubeRegistry() + var gather metrics.Gatherer = registry + registry.MustRegister(vec) + // Observe two metrics with same value for label1 but different value of label2. + vec.WithLabelValues("value1-0", "value2-0").Add(1.5) + vec.WithLabelValues("value1-0", "value2-1").Add(2.5) + vec.WithLabelValues("value1-1", "value2-0").Add(3.5) + vec.WithLabelValues("value1-1", "value2-1").Add(4.5) + + // The check for empty metric apparently cannot be tested: registering + // a NewCounterVec with no values has the affect that it doesn't get + // returned, leading to "not found". + + counterValues, err := GetCounterValuesFromGatherer(gather, tt.metricName, tt.lvMap, tt.labelName) + if err != nil { + if tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("expected error %q, got instead: %v", tt.wantErr, err) + } + return + } + if tt.wantErr != "" { + t.Fatalf("expected error %q, got none", tt.wantErr) + } + + if diff := cmp.Diff(tt.wantCounterValues, counterValues); diff != "" { + t.Errorf("Got unexpected HistogramVec (-want +got):\n%s", diff) + } + }) + } +} diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index 261dd5e776f..8a4bccba6b9 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -175,3 +175,22 @@ the ci-benchmark-scheduler-perf periodic job will fail with an error log such as This allows to analyze which workload failed. Make sure that the failure is not an outlier by checking multiple runs of the job. If the failures are not related to any regression, but to an incorrect threshold setting, it is reasonable to decrease it. + +### Visualization + +Some support for visualizing progress over time is built into the +benchmarks. The measurement operation which creates pods writes .dat files like +this: + + test/integration/scheduler_perf/SchedulingBasic_5000Nodes_2023-03-17T14:52:09Z.dat + +This file is in a text format that [gnuplot](http://www.gnuplot.info/) can +read. A wrapper script selects some suitable parameters: + + test/integration/scheduler_perf/gnuplot.sh test/integration/scheduler_perf/*.dat + +It plots in an interactive window by default. To write into a file, use + + test/integration/scheduler_perf/gnuplot.sh \ + -e 'set term png; set output ".png"' \ + test/integration/scheduler_perf/*.dat diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 82a0abc9e87..c01f32bf717 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1167,7 +1167,9 @@ maxClaimsPerNode: 20 # SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate -# and dynamically creates ResourceClaim instances for each pod. +# and dynamically creates ResourceClaim instances for each pod. Node, pod and +# device counts are chosen so that the cluster gets filled up completely. +# # The driver uses structured parameters. - name: SchedulingWithResourceClaimTemplateStructured featureGates: @@ -1234,6 +1236,125 @@ measurePods: 2500 maxClaimsPerNode: 10 +# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate and +# dynamically creates ResourceClaim instances for each pod. It creates ten +# pods, waits for them to be scheduled, deletes them, and starts again, +# so the cluster remains at the same level of utilization. +# +# The number of already allocated claims can be varied, thus simulating +# various degrees of pre-existing resource utilization. +# +# The driver uses structured parameters. +- name: SteadyStateClusterResourceClaimTemplateStructured + featureGates: + DynamicResourceAllocation: true + # SchedulerQueueingHints: true + workloadTemplate: + - opcode: createNodes + countParam: $nodesWithoutDRA + - opcode: createNodes + nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml + countParam: $nodesWithDRA + - opcode: createResourceDriver + driverName: test-driver.cdi.k8s.io + nodes: scheduler-perf-dra-* + maxClaimsPerNodeParam: $maxClaimsPerNode + structuredParameters: true + - opcode: createAny + templatePath: config/dra/deviceclass-structured.yaml + - opcode: createAny + templatePath: config/dra/resourceclaim-structured.yaml + countParam: $initClaims + namespace: init + - opcode: allocResourceClaims + namespace: init + - opcode: createAny + templatePath: config/dra/resourceclaimtemplate-structured.yaml + namespace: test + - opcode: createPods + namespace: test + count: 10 + steadyState: true + durationParam: $duration + podTemplatePath: config/dra/pod-with-claim-template.yaml + collectMetrics: true + workloads: + - name: fast + labels: [integration-test, fast, short] + params: + # This testcase runs through all code paths without + # taking too long overall. + nodesWithDRA: 1 + nodesWithoutDRA: 1 + initClaims: 0 + maxClaimsPerNode: 10 + duration: 2s + - name: empty_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initClaims: 0 + maxClaimsPerNode: 10 + duration: 10s + - name: empty_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initClaims: 0 + maxClaimsPerNode: 10 + duration: 10s + - name: empty_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initClaims: 0 + maxClaimsPerNode: 10 + duration: 10s + # In the "half" scenarios, half of the devices are in use. + - name: half_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initClaims: 500 + maxClaimsPerNode: 10 + duration: 10s + - name: half_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initClaims: 1000 + maxClaimsPerNode: 10 + duration: 10s + - name: half_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initClaims: 2500 + maxClaimsPerNode: 10 + duration: 10s + # In the "full" scenarios, the cluster can accommodate exactly 10 additional pods. + - name: full_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initClaims: 990 + maxClaimsPerNode: 10 + duration: 10s + - name: full_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initClaims: 1990 + maxClaimsPerNode: 10 + duration: 10s + - name: full_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initClaims: 4990 + maxClaimsPerNode: 10 + duration: 10s + # SchedulingWithResourceClaimTemplate uses ResourceClaims # with deterministic names that are shared between pods. # There is a fixed ratio of 1:5 between claims and pods. diff --git a/test/integration/scheduler_perf/create.go b/test/integration/scheduler_perf/create.go index e716d78dc00..15d0973bbf3 100644 --- a/test/integration/scheduler_perf/create.go +++ b/test/integration/scheduler_perf/create.go @@ -56,9 +56,6 @@ type createAny struct { var _ runnableOp = &createAny{} func (c *createAny) isValid(allowParameterization bool) error { - if c.Opcode != createAnyOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", c.Opcode, createAnyOpcode) - } if c.TemplatePath == "" { return fmt.Errorf("TemplatePath must be set") } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d76cfd6b2ac..8bf0d93e9c6 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -19,15 +19,26 @@ package benchmark import ( "context" "fmt" + "math/rand/v2" "path/filepath" + "reflect" "sync" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" ) // createResourceClaimsOp defines an op where resource claims are created. @@ -48,9 +59,6 @@ var _ realOp = &createResourceClaimsOp{} var _ runnableOp = &createResourceClaimsOp{} func (op *createResourceClaimsOp) isValid(allowParameterization bool) error { - if op.Opcode != createResourceClaimsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode) - } if !isValidCount(allowParameterization, op.Count, op.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam) } @@ -137,9 +145,6 @@ var _ realOp = &createResourceDriverOp{} var _ runnableOp = &createResourceDriverOp{} func (op *createResourceDriverOp) isValid(allowParameterization bool) error { - if op.Opcode != createResourceDriverOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode) - } if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) { return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) } @@ -247,11 +252,127 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou for i := 0; i < capacity; i++ { slice.Spec.Devices = append(slice.Spec.Devices, resourceapi.Device{ - Name: fmt.Sprintf("instance-%d", i), - Basic: &resourceapi.BasicDevice{}, + Name: fmt.Sprintf("instance-%d", i), + Basic: &resourceapi.BasicDevice{ + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "model": {StringValue: ptr.To("A100")}, + "family": {StringValue: ptr.To("GPU")}, + "driverVersion": {VersionValue: ptr.To("1.2.3")}, + "dra.example.com/numa": {IntValue: ptr.To(int64(i))}, + }, + Capacity: map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("1Gi"), + }, + }, }, ) } return slice } + +// allocResourceClaimsOp defines an op where resource claims with structured +// parameters get allocated without being associated with a pod. +type allocResourceClaimsOp struct { + // Must be allocResourceClaimsOpcode. + Opcode operationCode + // Namespace where claims are to be allocated, all namespaces if empty. + Namespace string +} + +var _ realOp = &allocResourceClaimsOp{} +var _ runnableOp = &allocResourceClaimsOp{} + +func (op *allocResourceClaimsOp) isValid(allowParameterization bool) error { + return nil +} + +func (op *allocResourceClaimsOp) collectsMetrics() bool { + return false +} +func (op *allocResourceClaimsOp) patchParams(w *workload) (realOp, error) { + return op, op.isValid(false) +} + +func (op *allocResourceClaimsOp) requiredNamespaces() []string { return nil } + +func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { + claims, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(op.Namespace).List(tCtx, metav1.ListOptions{}) + tCtx.ExpectNoError(err, "list claims") + tCtx.Logf("allocating %d ResourceClaims", len(claims.Items)) + tCtx = ktesting.WithCancel(tCtx) + defer tCtx.Cancel("allocResourceClaimsOp.run is done") + + // Track cluster state. + informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0) + claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer() + classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister() + sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() + claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) + claimLister := claimLister{cache: claimCache} + informerFactory.Start(tCtx.Done()) + defer func() { + tCtx.Cancel("allocResourceClaimsOp.run is shutting down") + informerFactory.Shutdown() + }() + syncedInformers := informerFactory.WaitForCacheSync(tCtx.Done()) + expectSyncedInformers := map[reflect.Type]bool{ + reflect.TypeOf(&resourceapi.DeviceClass{}): true, + reflect.TypeOf(&resourceapi.ResourceClaim{}): true, + reflect.TypeOf(&resourceapi.ResourceSlice{}): true, + reflect.TypeOf(&v1.Node{}): true, + } + require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + + // The set of nodes is assumed to be fixed at this point. + nodes, err := nodeLister.List(labels.Everything()) + tCtx.ExpectNoError(err, "list nodes") + + // Allocate one claim at a time, picking nodes randomly. Each + // allocation is stored immediately, using the claim cache to avoid + // having to wait for the actual informer update. +claims: + for i := range claims.Items { + claim := &claims.Items[i] + if claim.Status.Allocation != nil { + continue + } + + allocator, err := structured.NewAllocator(tCtx, []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister) + tCtx.ExpectNoError(err, "create allocator") + + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + for _, node := range nodes { + result, err := allocator.Allocate(tCtx, node) + tCtx.ExpectNoError(err, "allocate claim") + if result != nil { + claim = claim.DeepCopy() + claim.Status.Allocation = result[0] + claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) + tCtx.ExpectNoError(err, "update claim status with allocation") + tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") + continue claims + } + } + tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) + } +} + +type claimLister struct { + cache *assumecache.AssumeCache +} + +func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { + objs := c.cache.List(nil) + allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs)) + for _, obj := range objs { + claim := obj.(*resourceapi.ResourceClaim) + if claim.Status.Allocation != nil { + allocatedClaims = append(allocatedClaims, claim) + } + } + return allocatedClaims, nil +} diff --git a/test/integration/scheduler_perf/gnuplot.sh b/test/integration/scheduler_perf/gnuplot.sh new file mode 100755 index 00000000000..885276559ae --- /dev/null +++ b/test/integration/scheduler_perf/gnuplot.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Invoke this script with a list of *.dat and it'll plot them with gnuplot. +# Any non-file parameter is passed through to gnuplot. By default, +# an X11 window is used to display the result. To write into a file, +# use +# -e "set term png; set output .png" + +files=() +args=( -e "set term x11 persist" ) + +for i in "$@"; do + if [ -f "$i" ]; then + files+=("$i") + else + args+=("$i") + fi +done + +( + cat < 0 { sort.Float64s(tc.schedulingThroughputs) sum := 0.0