From 7bbb3465e5b21ef35a07c4243f3ba20fc009df28 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 10 Sep 2024 11:02:46 +0200 Subject: [PATCH 1/5] scheduler_perf: more realistic structured parameters tests Real devices are likely to have a handful of attributes and (for GPUs) the memory as capacity. Most keys will be driver specific, a few may eventually have a domain (none standardized right now). --- test/integration/scheduler_perf/dra.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d76cfd6b2ac..66039a67423 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -23,11 +23,13 @@ import ( "sync" resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" ) // createResourceClaimsOp defines an op where resource claims are created. @@ -247,8 +249,18 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou for i := 0; i < capacity; i++ { slice.Spec.Devices = append(slice.Spec.Devices, resourceapi.Device{ - Name: fmt.Sprintf("instance-%d", i), - Basic: &resourceapi.BasicDevice{}, + Name: fmt.Sprintf("instance-%d", i), + Basic: &resourceapi.BasicDevice{ + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "model": {StringValue: ptr.To("A100")}, + "family": {StringValue: ptr.To("GPU")}, + "driverVersion": {VersionValue: ptr.To("1.2.3")}, + "dra.example.com/numa": {IntValue: ptr.To(int64(i))}, + }, + Capacity: map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("1Gi"), + }, + }, }, ) } From 51cafb005341ef3fdded1575b6f1f1b7cc93be65 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 12 Sep 2024 12:59:10 +0200 Subject: [PATCH 2/5] scheduler_perf: more useful errors for configuration mistakes Before, the first error was reported, which typically was the "invalid op code" error from the createAny operation: scheduler_perf.go:900: parsing test cases error: error unmarshaling JSON: while decoding JSON: cannot unmarshal {"collectMetrics":true,"count":10,"duration":"30s","namespace":"test","opcode":"createPods","podTemplatePath":"config/dra/pod-with-claim-template.yaml","steadyState":true} into any known op type: invalid opcode "createPods"; expected "createAny" Now the opcode is determined first, then decoding into exactly the matching operation is tried and validated. Unknown fields are an error. In the case above, decoding a string into time.Duration failed: scheduler_test.go:29: parsing test cases error: error unmarshaling JSON: while decoding JSON: decoding {"collectMetrics":true,"count":10,"duration":"30s","namespace":"test","opcode":"createPods","podTemplatePath":"config/dra/pod-with-claim-template.yaml","steadyState":true} into *benchmark.createPodsOp: json: cannot unmarshal string into Go struct field createPodsOp.Duration of type time.Duration Some typos: scheduler_test.go:29: parsing test cases error: error unmarshaling JSON: while decoding JSON: unknown opcode "sleeep" in {"duration":"5s","opcode":"sleeep"} scheduler_test.go:29: parsing test cases error: error unmarshaling JSON: while decoding JSON: decoding {"countParram":"$deletingPods","deletePodsPerSecond":50,"opcode":"createPods"} into *benchmark.createPodsOp: json: unknown field "countParram" --- test/integration/scheduler_perf/create.go | 3 - test/integration/scheduler_perf/dra.go | 6 -- .../scheduler_perf/scheduler_perf.go | 91 ++++++++----------- 3 files changed, 37 insertions(+), 63 deletions(-) diff --git a/test/integration/scheduler_perf/create.go b/test/integration/scheduler_perf/create.go index e716d78dc00..15d0973bbf3 100644 --- a/test/integration/scheduler_perf/create.go +++ b/test/integration/scheduler_perf/create.go @@ -56,9 +56,6 @@ type createAny struct { var _ runnableOp = &createAny{} func (c *createAny) isValid(allowParameterization bool) error { - if c.Opcode != createAnyOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", c.Opcode, createAnyOpcode) - } if c.TemplatePath == "" { return fmt.Errorf("TemplatePath must be set") } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 66039a67423..d4fa422c7f0 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -50,9 +50,6 @@ var _ realOp = &createResourceClaimsOp{} var _ runnableOp = &createResourceClaimsOp{} func (op *createResourceClaimsOp) isValid(allowParameterization bool) error { - if op.Opcode != createResourceClaimsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode) - } if !isValidCount(allowParameterization, op.Count, op.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam) } @@ -139,9 +136,6 @@ var _ realOp = &createResourceDriverOp{} var _ runnableOp = &createResourceDriverOp{} func (op *createResourceDriverOp) isValid(allowParameterization bool) error { - if op.Opcode != createResourceDriverOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode) - } if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) { return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) } diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 4a22f0bf033..2b88e12ba93 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -17,6 +17,7 @@ limitations under the License. package benchmark import ( + "bytes" "context" "encoding/json" "errors" @@ -404,36 +405,43 @@ type op struct { // UnmarshalJSON is a custom unmarshaler for the op struct since we don't know // which op we're decoding at runtime. func (op *op) UnmarshalJSON(b []byte) error { - possibleOps := []realOp{ - &createAny{}, - &createNodesOp{}, - &createNamespacesOp{}, - &createPodsOp{}, - &createPodSetsOp{}, - &deletePodsOp{}, - &createResourceClaimsOp{}, - &createResourceDriverOp{}, - &churnOp{}, - &barrierOp{}, - &sleepOp{}, - &startCollectingMetricsOp{}, - &stopCollectingMetricsOp{}, + possibleOps := map[operationCode]realOp{ + createAnyOpcode: &createAny{}, + createNodesOpcode: &createNodesOp{}, + createNamespacesOpcode: &createNamespacesOp{}, + createPodsOpcode: &createPodsOp{}, + createPodSetsOpcode: &createPodSetsOp{}, + deletePodsOpcode: &deletePodsOp{}, + createResourceClaimsOpcode: &createResourceClaimsOp{}, + createResourceDriverOpcode: &createResourceDriverOp{}, + churnOpcode: &churnOp{}, + barrierOpcode: &barrierOp{}, + sleepOpcode: &sleepOp{}, + startCollectingMetricsOpcode: &startCollectingMetricsOp{}, + stopCollectingMetricsOpcode: &stopCollectingMetricsOp{}, // TODO(#94601): add a delete nodes op to simulate scaling behaviour? } - var firstError error - for _, possibleOp := range possibleOps { - if err := json.Unmarshal(b, possibleOp); err == nil { - if err2 := possibleOp.isValid(true); err2 == nil { - op.realOp = possibleOp - return nil - } else if firstError == nil { - // Don't return an error yet. Even though this op is invalid, it may - // still match other possible ops. - firstError = err2 - } - } + // First determine the opcode using lenient decoding (= ignore extra fields). + var possibleOp struct { + Opcode operationCode } - return fmt.Errorf("cannot unmarshal %s into any known op type: %w", string(b), firstError) + if err := json.Unmarshal(b, &possibleOp); err != nil { + return fmt.Errorf("decoding opcode from %s: %w", string(b), err) + } + realOp, ok := possibleOps[possibleOp.Opcode] + if !ok { + return fmt.Errorf("unknown opcode %q in %s", possibleOp.Opcode, string(b)) + } + decoder := json.NewDecoder(bytes.NewReader(b)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(realOp); err != nil { + return fmt.Errorf("decoding %s into %T: %w", string(b), realOp, err) + } + if err := realOp.isValid(true); err != nil { + return fmt.Errorf("%s not valid for %T: %w", string(b), realOp, err) + } + op.realOp = realOp + return nil } // realOp is an interface that is implemented by different structs. To evaluate @@ -441,6 +449,8 @@ func (op *op) UnmarshalJSON(b []byte) error { type realOp interface { // isValid verifies the validity of the op args such as node/pod count. Note // that we don't catch undefined parameters at this stage. + // + // This returns errInvalidOp if the configured operation does not match. isValid(allowParameterization bool) error // collectsMetrics checks if the op collects metrics. collectsMetrics() bool @@ -497,9 +507,6 @@ type createNodesOp struct { } func (cno *createNodesOp) isValid(allowParameterization bool) error { - if cno.Opcode != createNodesOpcode { - return fmt.Errorf("invalid opcode %q", cno.Opcode) - } if !isValidCount(allowParameterization, cno.Count, cno.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam) } @@ -538,9 +545,6 @@ type createNamespacesOp struct { } func (cmo *createNamespacesOp) isValid(allowParameterization bool) error { - if cmo.Opcode != createNamespacesOpcode { - return fmt.Errorf("invalid opcode %q", cmo.Opcode) - } if !isValidCount(allowParameterization, cmo.Count, cmo.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam) } @@ -595,9 +599,6 @@ type createPodsOp struct { } func (cpo *createPodsOp) isValid(allowParameterization bool) error { - if cpo.Opcode != createPodsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", cpo.Opcode, createPodsOpcode) - } if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam) } @@ -641,9 +642,6 @@ type createPodSetsOp struct { } func (cpso *createPodSetsOp) isValid(allowParameterization bool) error { - if cpso.Opcode != createPodSetsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", cpso.Opcode, createPodSetsOpcode) - } if !isValidCount(allowParameterization, cpso.Count, cpso.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam) } @@ -729,9 +727,6 @@ type churnOp struct { } func (co *churnOp) isValid(_ bool) error { - if co.Opcode != churnOpcode { - return fmt.Errorf("invalid opcode %q", co.Opcode) - } if co.Mode != Recreate && co.Mode != Create { return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create}) } @@ -767,9 +762,6 @@ type barrierOp struct { } func (bo *barrierOp) isValid(allowParameterization bool) error { - if bo.Opcode != barrierOpcode { - return fmt.Errorf("invalid opcode %q", bo.Opcode) - } return nil } @@ -805,9 +797,6 @@ func (so *sleepOp) UnmarshalJSON(data []byte) (err error) { } func (so *sleepOp) isValid(_ bool) error { - if so.Opcode != sleepOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", so.Opcode, sleepOpcode) - } return nil } @@ -831,9 +820,6 @@ type startCollectingMetricsOp struct { } func (scm *startCollectingMetricsOp) isValid(_ bool) error { - if scm.Opcode != startCollectingMetricsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, startCollectingMetricsOpcode) - } if len(scm.Namespaces) == 0 { return fmt.Errorf("namespaces cannot be empty") } @@ -857,9 +843,6 @@ type stopCollectingMetricsOp struct { } func (scm *stopCollectingMetricsOp) isValid(_ bool) error { - if scm.Opcode != stopCollectingMetricsOpcode { - return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, stopCollectingMetricsOpcode) - } return nil } From 385599f0a8b48009a48ea06f26e72e0f52258893 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 12 Sep 2024 17:09:50 +0200 Subject: [PATCH 3/5] scheduler_perf + DRA: measure pod scheduling at a steady state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous tests were based on scheduling pods until the cluster was full. This is a valid scenario, but not necessarily realistic. More realistic is how quickly the scheduler can schedule new pods when some old pods finished running, in particular in a cluster that is properly utilized (= almost full). To test this, pods must get created, scheduled, and then immediately deleted. This can run for a certain period of time. Scenarios with empty and full cluster have different scheduling rates. This was previously visible for DRA because the 50% percentile of the scheduling throughput was lower than the average, but one had to guess in which scenario the throughput was lower. Now this can be measured for DRA with the new SteadyStateClusterResourceClaimTemplateStructured test. The metrics collector must watch pod events to figure out how many pods got scheduled. Polling misses pods that already got deleted again. There seems to be no relevant difference in the collected metrics (SchedulingWithResourceClaimTemplateStructured/2000pods_200nodes, 6 repetitions): │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ 157.1 ± 0% 157.1 ± 0% ~ (p=0.329 n=6) │ before │ after │ │ SchedulingThroughput/Perc50 │ SchedulingThroughput/Perc50 vs base │ 48.99 ± 8% 47.52 ± 9% ~ (p=0.937 n=6) │ before │ after │ │ SchedulingThroughput/Perc90 │ SchedulingThroughput/Perc90 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6) │ before │ after │ │ SchedulingThroughput/Perc95 │ SchedulingThroughput/Perc95 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6) │ before │ after │ │ SchedulingThroughput/Perc99 │ SchedulingThroughput/Perc99 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6) --- .../config/performance-config.yaml | 102 +++++++- .../scheduler_perf/scheduler_perf.go | 226 +++++++++++++++++- test/integration/scheduler_perf/util.go | 85 ++++++- 3 files changed, 390 insertions(+), 23 deletions(-) diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 82a0abc9e87..8f29ab49599 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1167,7 +1167,9 @@ maxClaimsPerNode: 20 # SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate -# and dynamically creates ResourceClaim instances for each pod. +# and dynamically creates ResourceClaim instances for each pod. Node, pod and +# device counts are chosen so that the cluster gets filled up completely. +# # The driver uses structured parameters. - name: SchedulingWithResourceClaimTemplateStructured featureGates: @@ -1234,6 +1236,104 @@ measurePods: 2500 maxClaimsPerNode: 10 +# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate +# and dynamically creates ResourceClaim instances for each pod, but never +# more than 10 at a time. Then it waits for a pod to get scheduled +# before deleting it and creating another one. +# +# The workload determines whether there are other pods in the cluster. +# +# The driver uses structured parameters. +- name: SteadyStateClusterResourceClaimTemplateStructured + featureGates: + DynamicResourceAllocation: true + # SchedulerQueueingHints: true + workloadTemplate: + - opcode: createNodes + countParam: $nodesWithoutDRA + - opcode: createNodes + nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml + countParam: $nodesWithDRA + - opcode: createResourceDriver + driverName: test-driver.cdi.k8s.io + nodes: scheduler-perf-dra-* + maxClaimsPerNodeParam: $maxClaimsPerNode + structuredParameters: true + - opcode: createAny + templatePath: config/dra/deviceclass-structured.yaml + - opcode: createAny + templatePath: config/dra/resourceclaimtemplate-structured.yaml + namespace: init + - opcode: createPods + namespace: init + countParam: $initPods + podTemplatePath: config/dra/pod-with-claim-template.yaml + - opcode: createAny + templatePath: config/dra/resourceclaimtemplate-structured.yaml + namespace: test + - opcode: createPods + namespace: test + count: 10 + steadyState: true + durationParam: $duration + podTemplatePath: config/dra/pod-with-claim-template.yaml + collectMetrics: true + workloads: + - name: fast + labels: [integration-test, fast, short] + params: + # This testcase runs through all code paths without + # taking too long overall. + nodesWithDRA: 1 + nodesWithoutDRA: 1 + initPods: 0 + maxClaimsPerNode: 10 + duration: 2s + - name: empty_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + - name: empty_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + - name: empty_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initPods: 0 + maxClaimsPerNode: 2 + duration: 10s + # In the "full" scenarios, the cluster can accommodate exactly one additional pod. + # These are slower because scheduling the initial pods takes time. + - name: full_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initPods: 199 + maxClaimsPerNode: 2 + duration: 10s + - name: full_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initPods: 399 + maxClaimsPerNode: 2 + duration: 10s + - name: full_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initPods: 999 + maxClaimsPerNode: 2 + duration: 10s + # SchedulingWithResourceClaimTemplate uses ResourceClaims # with deterministic names that are shared between pods. # There is a fixed ratio of 1:5 between claims and pods. diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 2b88e12ba93..94da182036f 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -49,6 +49,7 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" logsapi "k8s.io/component-base/logs/api/v1" @@ -63,10 +64,12 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/test/utils/ktesting/initoption" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -341,7 +344,7 @@ func (ms thresholdMetricSelector) isValid(mcc *metricsCollectorConfig) error { } type params struct { - params map[string]int + params map[string]any // isUsed field records whether params is used or not. isUsed map[string]bool } @@ -358,14 +361,14 @@ type params struct { // to: // // params{ -// params: map[string]int{ +// params: map[string]any{ // "intNodes": 500, // "initPods": 50, // }, // isUsed: map[string]bool{}, // empty map // } func (p *params) UnmarshalJSON(b []byte) error { - aux := map[string]int{} + aux := map[string]any{} if err := json.Unmarshal(b, &aux); err != nil { return err @@ -376,14 +379,31 @@ func (p *params) UnmarshalJSON(b []byte) error { return nil } -// get returns param. +// get retrieves the parameter as an integer func (p params) get(key string) (int, error) { + // JSON unmarshals integer constants in an "any" field as float. + f, err := getParam[float64](p, key) + if err != nil { + return 0, err + } + return int(f), nil +} + +// getParam retrieves the parameter as specific type. There is no conversion, +// so in practice this means that only types that JSON unmarshaling uses +// (float64, string, bool) work. +func getParam[T float64 | string | bool](p params, key string) (T, error) { p.isUsed[key] = true param, ok := p.params[key] - if ok { - return param, nil + var t T + if !ok { + return t, fmt.Errorf("parameter %s is undefined", key) } - return 0, fmt.Errorf("parameter %s is undefined", key) + t, ok = param.(T) + if !ok { + return t, fmt.Errorf("parameter %s has the wrong type %T", key, param) + } + return t, nil } // unusedParams returns the names of unusedParams @@ -576,6 +596,27 @@ type createPodsOp struct { Count int // Template parameter for Count. CountParam string + // If false, Count pods get created rapidly. This can be used to + // measure how quickly the scheduler can fill up a cluster. + // + // If true, Count pods get created, the operation waits for + // a pod to get scheduled, deletes it and then creates another. + // This continues until the configured Duration is over. + // Metrics collection, if enabled, runs in parallel. + // + // This mode can be used to measure how the scheduler behaves + // in a steady state where the cluster is always at roughly the + // same level of utilization. Pods can be created in a separate, + // earlier operation to simulate non-empty clusters. + // + // Note that the operation will delete any scheduled pod in + // the namespace, so use different namespaces for pods that + // are supposed to be kept running. + SteadyState bool + // How long to keep the cluster in a steady state. + Duration metav1.Duration + // Template parameter for Duration. + DurationParam string // Whether or not to enable metrics collection for this createPodsOp. // Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at // the same time for a particular createPodsOp. @@ -608,6 +649,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error { // use-cases right now. return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time") } + if cpo.SkipWaitToCompletion && cpo.SteadyState { + return errors.New("skipWaitToCompletion and steadyState cannot be true at the same time") + } return nil } @@ -623,6 +667,15 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) { return nil, err } } + if cpo.DurationParam != "" { + durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:]) + if err != nil { + return nil, err + } + if cpo.Duration.Duration, err = time.ParseDuration(durationStr); err != nil { + return nil, fmt.Errorf("parsing duration parameter %s: %w", cpo.DurationParam, err) + } + } return &cpo, (&cpo).isValid(false) } @@ -1298,14 +1351,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}) defer collectorCtx.Cancel("cleaning up") } - if err := createPods(tCtx, namespace, concreteOp); err != nil { + if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } - if concreteOp.SkipWaitToCompletion { + switch { + case concreteOp.SkipWaitToCompletion: // Only record those namespaces that may potentially require barriers // in the future. numPodsScheduledPerNamespace[namespace] += concreteOp.Count - } else { + case concreteOp.SteadyState: + if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil { + tCtx.Fatalf("op %d: %v", opIndex, err) + } + default: if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil { tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } @@ -1588,7 +1646,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { +// createPodsRapidly implements the "create pods rapidly" mode of [createPodsOp]. +// It's a nop when cpo.SteadyState is true. +func createPodsRapidly(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { + if cpo.SteadyState { + return nil + } strategy, err := getPodStrategy(cpo) if err != nil { return err @@ -1600,6 +1663,147 @@ func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) err return podCreator.CreatePods(tCtx) } +// createPodsSteadily implements the "create pods and delete pods" mode of [createPodsOp]. +// It's a nop when cpo.SteadyState is false. +func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer coreinformers.PodInformer, cpo *createPodsOp) error { + if !cpo.SteadyState { + return nil + } + strategy, err := getPodStrategy(cpo) + if err != nil { + return err + } + tCtx.Logf("creating pods in namespace %q for %s", namespace, cpo.Duration) + tCtx = ktesting.WithTimeout(tCtx, cpo.Duration.Duration, fmt.Sprintf("the operation ran for the configured %s", cpo.Duration.Duration)) + + // Start watching pods in the namespace. Any pod which is seen as being scheduled + // gets deleted. + scheduledPods := make(chan *v1.Pod, cpo.Count) + scheduledPodsClosed := false + var mutex sync.Mutex + defer func() { + mutex.Lock() + defer mutex.Unlock() + close(scheduledPods) + scheduledPodsClosed = true + }() + + existingPods := 0 + runningPods := 0 + onPodChange := func(oldObj, newObj any) { + oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + mutex.Lock() + defer mutex.Unlock() + if oldPod == nil { + existingPods++ + } + if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" { + // Got scheduled. + runningPods++ + + // Only ask for deletion in our namespace. + if newPod.Namespace != namespace { + return + } + if !scheduledPodsClosed { + select { + case <-tCtx.Done(): + case scheduledPods <- newPod: + } + } + } + } + handle, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + onPodChange(nil, obj) + }, + UpdateFunc: func(oldObj, newObj any) { + onPodChange(oldObj, newObj) + }, + DeleteFunc: func(obj any) { + pod, _, err := schedutil.As[*v1.Pod](obj, nil) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + existingPods-- + if pod.Spec.NodeName != "" { + runningPods-- + } + }, + }) + if err != nil { + return fmt.Errorf("register event handler: %w", err) + } + defer func() { + tCtx.ExpectNoError(podInformer.Informer().RemoveEventHandler(handle), "remove event handler") + }() + + // Seed the namespace with the initial number of pods. + if err := strategy(tCtx, tCtx.Client(), namespace, cpo.Count); err != nil { + return fmt.Errorf("create initial %d pods: %w", cpo.Count, err) + } + + // Now loop until we are done. Report periodically how many pods were scheduled. + countScheduledPods := 0 + lastCountScheduledPods := 0 + logPeriod := time.Second + ticker := time.NewTicker(logPeriod) + defer ticker.Stop() + for { + select { + case <-tCtx.Done(): + tCtx.Logf("Completed after seeing %d scheduled pod: %v", countScheduledPods, context.Cause(tCtx)) + return nil + case <-scheduledPods: + countScheduledPods++ + if countScheduledPods%cpo.Count == 0 { + // All scheduled. Start over with a new batch. + err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, metav1.DeleteOptions{ + GracePeriodSeconds: ptr.To(int64(0)), + PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), // Foreground will block. + }, metav1.ListOptions{}) + // Ignore errors when the time is up. errors.Is(context.Canceled) would + // be more precise, but doesn't work because client-go doesn't reliably + // propagate it. Instead, this was seen: + // client rate limiter Wait returned an error: rate: Wait(n=1) would exceed context deadline + if tCtx.Err() != nil { + continue + } + if err != nil { + return fmt.Errorf("delete scheduled pods: %w", err) + } + err = strategy(tCtx, tCtx.Client(), namespace, cpo.Count) + if tCtx.Err() != nil { + continue + } + if err != nil { + return fmt.Errorf("create next batch of pods: %w", err) + } + } + case <-ticker.C: + delta := countScheduledPods - lastCountScheduledPods + lastCountScheduledPods = countScheduledPods + func() { + mutex.Lock() + defer mutex.Unlock() + + tCtx.Logf("%d pods got scheduled in total in namespace %q, overall %d out of %d pods scheduled: %f pods/s in last interval", + countScheduledPods, namespace, + runningPods, existingPods, + float64(delta)/logPeriod.Seconds(), + ) + }() + } + } +} + // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 2e93d42a62c..f2f95ff10bf 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -26,6 +26,7 @@ import ( "path" "sort" "strings" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -35,6 +36,7 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/component-base/featuregate" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" @@ -45,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" testutils "k8s.io/kubernetes/test/utils" @@ -378,7 +381,7 @@ type throughputCollector struct { podInformer coreinformers.PodInformer schedulingThroughputs []float64 labels map[string]string - namespaces []string + namespaces sets.Set[string] errorMargin float64 } @@ -386,7 +389,7 @@ func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[st return &throughputCollector{ podInformer: podInformer, labels: labels, - namespaces: namespaces, + namespaces: sets.New(namespaces...), errorMargin: errorMargin, } } @@ -396,11 +399,75 @@ func (tc *throughputCollector) init() error { } func (tc *throughputCollector) run(tCtx ktesting.TContext) { - podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) - if err != nil { - klog.Fatalf("%v", err) + // The collector is based on informer cache events instead of periodically listing pods because: + // - polling causes more overhead + // - it does not work when pods get created, scheduled and deleted quickly + // + // Normally, informers cannot be used to observe state changes reliably. + // They only guarantee that the *some* updates get reported, but not *all*. + // But in scheduler_perf, the scheduler and the test share the same informer, + // therefore we are guaranteed to see a new pod without NodeName (because + // that is what the scheduler needs to see to schedule it) and then the updated + // pod with NodeName (because nothing makes further changes to it). + var mutex sync.Mutex + scheduledPods := 0 + getScheduledPods := func() int { + mutex.Lock() + defer mutex.Unlock() + return scheduledPods } - lastScheduledCount := len(podsScheduled) + onPodChange := func(oldObj, newObj any) { + oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj) + if err != nil { + tCtx.Errorf("unexpected pod events: %v", err) + return + } + + if !tc.namespaces.Has(newPod.Namespace) { + return + } + + mutex.Lock() + defer mutex.Unlock() + if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" { + // Got scheduled. + scheduledPods++ + } + } + handle, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + onPodChange(nil, obj) + }, + UpdateFunc: func(oldObj, newObj any) { + onPodChange(oldObj, newObj) + }, + }) + if err != nil { + tCtx.Fatalf("register pod event handler: %v", err) + } + defer func() { + tCtx.ExpectNoError(tc.podInformer.Informer().RemoveEventHandler(handle), "remove event handler") + }() + + // Waiting for the initial sync didn't work, `handle.HasSynced` always returned + // false - perhaps because the event handlers get added to a running informer. + // That's okay(ish), throughput is typically measured within an empty namespace. + // + // syncTicker := time.NewTicker(time.Millisecond) + // defer syncTicker.Stop() + // for { + // select { + // case <-syncTicker.C: + // if handle.HasSynced() { + // break + // } + // case <-tCtx.Done(): + // return + // } + // } + tCtx.Logf("Started pod throughput collector for namespace(s) %s, %d pods scheduled so far", sets.List(tc.namespaces), getScheduledPods()) + + lastScheduledCount := getScheduledPods() ticker := time.NewTicker(throughputSampleInterval) defer ticker.Stop() lastSampleTime := time.Now() @@ -413,12 +480,8 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) { return case <-ticker.C: now := time.Now() - podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) - if err != nil { - klog.Fatalf("%v", err) - } - scheduled := len(podsScheduled) + scheduled := getScheduledPods() // Only do sampling if number of scheduled pods is greater than zero. if scheduled == 0 { continue From ded96042f7caa61bbe854681fc0f99846d1937a5 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 13 Sep 2024 09:51:27 +0200 Subject: [PATCH 4/5] scheduler_perf + DRA: load up cluster by allocating claims Having to schedule 4999 pods to simulate a "full" cluster is slow. Creating claims and then allocating them more or less like the scheduler would when scheduling pods is much faster and in practice has the same effect on the dynamicresources plugin because it looks at claims, not pods. This allows defining the "steady state" workloads with higher number of devices ("claimsPerNode") again. This was prohibitively slow before. --- .../config/performance-config.yaml | 69 +++++++---- test/integration/scheduler_perf/dra.go | 115 ++++++++++++++++++ .../scheduler_perf/scheduler_perf.go | 2 + 3 files changed, 162 insertions(+), 24 deletions(-) diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 8f29ab49599..c01f32bf717 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1236,12 +1236,13 @@ measurePods: 2500 maxClaimsPerNode: 10 -# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate -# and dynamically creates ResourceClaim instances for each pod, but never -# more than 10 at a time. Then it waits for a pod to get scheduled -# before deleting it and creating another one. +# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate and +# dynamically creates ResourceClaim instances for each pod. It creates ten +# pods, waits for them to be scheduled, deletes them, and starts again, +# so the cluster remains at the same level of utilization. # -# The workload determines whether there are other pods in the cluster. +# The number of already allocated claims can be varied, thus simulating +# various degrees of pre-existing resource utilization. # # The driver uses structured parameters. - name: SteadyStateClusterResourceClaimTemplateStructured @@ -1262,12 +1263,11 @@ - opcode: createAny templatePath: config/dra/deviceclass-structured.yaml - opcode: createAny - templatePath: config/dra/resourceclaimtemplate-structured.yaml + templatePath: config/dra/resourceclaim-structured.yaml + countParam: $initClaims namespace: init - - opcode: createPods + - opcode: allocResourceClaims namespace: init - countParam: $initPods - podTemplatePath: config/dra/pod-with-claim-template.yaml - opcode: createAny templatePath: config/dra/resourceclaimtemplate-structured.yaml namespace: test @@ -1286,52 +1286,73 @@ # taking too long overall. nodesWithDRA: 1 nodesWithoutDRA: 1 - initPods: 0 + initClaims: 0 maxClaimsPerNode: 10 duration: 2s - name: empty_100nodes params: nodesWithDRA: 100 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - name: empty_200nodes params: nodesWithDRA: 200 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - name: empty_500nodes params: nodesWithDRA: 500 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - # In the "full" scenarios, the cluster can accommodate exactly one additional pod. - # These are slower because scheduling the initial pods takes time. + # In the "half" scenarios, half of the devices are in use. + - name: half_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initClaims: 500 + maxClaimsPerNode: 10 + duration: 10s + - name: half_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initClaims: 1000 + maxClaimsPerNode: 10 + duration: 10s + - name: half_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initClaims: 2500 + maxClaimsPerNode: 10 + duration: 10s + # In the "full" scenarios, the cluster can accommodate exactly 10 additional pods. - name: full_100nodes params: nodesWithDRA: 100 nodesWithoutDRA: 0 - initPods: 199 - maxClaimsPerNode: 2 + initClaims: 990 + maxClaimsPerNode: 10 duration: 10s - name: full_200nodes params: nodesWithDRA: 200 nodesWithoutDRA: 0 - initPods: 399 - maxClaimsPerNode: 2 + initClaims: 1990 + maxClaimsPerNode: 10 duration: 10s - name: full_500nodes params: nodesWithDRA: 500 nodesWithoutDRA: 0 - initPods: 999 - maxClaimsPerNode: 2 + initClaims: 4990 + maxClaimsPerNode: 10 duration: 10s # SchedulingWithResourceClaimTemplate uses ResourceClaims diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d4fa422c7f0..8bf0d93e9c6 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -19,14 +19,23 @@ package benchmark import ( "context" "fmt" + "math/rand/v2" "path/filepath" + "reflect" "sync" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" @@ -261,3 +270,109 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou return slice } + +// allocResourceClaimsOp defines an op where resource claims with structured +// parameters get allocated without being associated with a pod. +type allocResourceClaimsOp struct { + // Must be allocResourceClaimsOpcode. + Opcode operationCode + // Namespace where claims are to be allocated, all namespaces if empty. + Namespace string +} + +var _ realOp = &allocResourceClaimsOp{} +var _ runnableOp = &allocResourceClaimsOp{} + +func (op *allocResourceClaimsOp) isValid(allowParameterization bool) error { + return nil +} + +func (op *allocResourceClaimsOp) collectsMetrics() bool { + return false +} +func (op *allocResourceClaimsOp) patchParams(w *workload) (realOp, error) { + return op, op.isValid(false) +} + +func (op *allocResourceClaimsOp) requiredNamespaces() []string { return nil } + +func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { + claims, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(op.Namespace).List(tCtx, metav1.ListOptions{}) + tCtx.ExpectNoError(err, "list claims") + tCtx.Logf("allocating %d ResourceClaims", len(claims.Items)) + tCtx = ktesting.WithCancel(tCtx) + defer tCtx.Cancel("allocResourceClaimsOp.run is done") + + // Track cluster state. + informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0) + claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer() + classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister() + sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() + claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) + claimLister := claimLister{cache: claimCache} + informerFactory.Start(tCtx.Done()) + defer func() { + tCtx.Cancel("allocResourceClaimsOp.run is shutting down") + informerFactory.Shutdown() + }() + syncedInformers := informerFactory.WaitForCacheSync(tCtx.Done()) + expectSyncedInformers := map[reflect.Type]bool{ + reflect.TypeOf(&resourceapi.DeviceClass{}): true, + reflect.TypeOf(&resourceapi.ResourceClaim{}): true, + reflect.TypeOf(&resourceapi.ResourceSlice{}): true, + reflect.TypeOf(&v1.Node{}): true, + } + require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + + // The set of nodes is assumed to be fixed at this point. + nodes, err := nodeLister.List(labels.Everything()) + tCtx.ExpectNoError(err, "list nodes") + + // Allocate one claim at a time, picking nodes randomly. Each + // allocation is stored immediately, using the claim cache to avoid + // having to wait for the actual informer update. +claims: + for i := range claims.Items { + claim := &claims.Items[i] + if claim.Status.Allocation != nil { + continue + } + + allocator, err := structured.NewAllocator(tCtx, []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister) + tCtx.ExpectNoError(err, "create allocator") + + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + for _, node := range nodes { + result, err := allocator.Allocate(tCtx, node) + tCtx.ExpectNoError(err, "allocate claim") + if result != nil { + claim = claim.DeepCopy() + claim.Status.Allocation = result[0] + claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) + tCtx.ExpectNoError(err, "update claim status with allocation") + tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") + continue claims + } + } + tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) + } +} + +type claimLister struct { + cache *assumecache.AssumeCache +} + +func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { + objs := c.cache.List(nil) + allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs)) + for _, obj := range objs { + claim := obj.(*resourceapi.ResourceClaim) + if claim.Status.Allocation != nil { + allocatedClaims = append(allocatedClaims, claim) + } + } + return allocatedClaims, nil +} diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 94da182036f..2f97f953385 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -76,6 +76,7 @@ import ( type operationCode string const ( + allocResourceClaimsOpcode operationCode = "allocResourceClaims" createAnyOpcode operationCode = "createAny" createNodesOpcode operationCode = "createNodes" createNamespacesOpcode operationCode = "createNamespaces" @@ -426,6 +427,7 @@ type op struct { // which op we're decoding at runtime. func (op *op) UnmarshalJSON(b []byte) error { possibleOps := map[operationCode]realOp{ + allocResourceClaimsOpcode: &allocResourceClaimsOp{}, createAnyOpcode: &createAny{}, createNodesOpcode: &createNodesOp{}, createNamespacesOpcode: &createNamespacesOp{}, From d100768d940ced902307b29907b11d5c0f63c1bc Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 27 Jan 2023 19:31:37 +0100 Subject: [PATCH 5/5] scheduler_perf: track and visualize progress over time This is useful to see whether pod scheduling happens in bursts and how it behaves over time, which is relevant in particular for dynamic resource allocation where it may become harder at the end to find the node which still has resources available. Besides "pods scheduled" it's also useful to know how many attempts were needed, so schedule_attempts_total also gets sampled and stored. To visualize the result of one or more test runs, use: gnuplot.sh *.dat --- .../metrics/testutil/metrics.go | 52 +++++++-- .../metrics/testutil/metrics_test.go | 102 ++++++++++++++++++ test/integration/scheduler_perf/README.md | 19 ++++ test/integration/scheduler_perf/gnuplot.sh | 54 ++++++++++ .../scheduler_perf/scheduler_perf.go | 24 +++++ test/integration/scheduler_perf/util.go | 49 ++++++++- 6 files changed, 293 insertions(+), 7 deletions(-) create mode 100755 test/integration/scheduler_perf/gnuplot.sh diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go index c595f55d64a..05d15b08d75 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go @@ -258,12 +258,8 @@ func GetHistogramVecFromGatherer(gatherer metrics.Gatherer, metricName string, l if err != nil { return nil, err } - for _, mFamily := range m { - if mFamily.GetName() == metricName { - metricFamily = mFamily - break - } - } + + metricFamily = findMetricFamily(m, metricName) if metricFamily == nil { return nil, fmt.Errorf("metric %q not found", metricName) @@ -433,3 +429,47 @@ func LabelsMatch(metric *dto.Metric, labelFilter map[string]string) bool { return true } + +// GetCounterVecFromGatherer collects a counter that matches the given name +// from a gatherer implementing k8s.io/component-base/metrics.Gatherer interface. +// It returns all counter values that had a label with a certain name in a map +// that uses the label value as keys. +// +// Used only for testing purposes where we need to gather metrics directly from a running binary (without metrics endpoint). +func GetCounterValuesFromGatherer(gatherer metrics.Gatherer, metricName string, lvMap map[string]string, labelName string) (map[string]float64, error) { + m, err := gatherer.Gather() + if err != nil { + return nil, err + } + + metricFamily := findMetricFamily(m, metricName) + if metricFamily == nil { + return nil, fmt.Errorf("metric %q not found", metricName) + } + if len(metricFamily.GetMetric()) == 0 { + return nil, fmt.Errorf("metric %q is empty", metricName) + } + + values := make(map[string]float64) + for _, metric := range metricFamily.GetMetric() { + if LabelsMatch(metric, lvMap) { + if counter := metric.GetCounter(); counter != nil { + for _, labelPair := range metric.Label { + if labelPair.GetName() == labelName { + values[labelPair.GetValue()] = counter.GetValue() + } + } + } + } + } + return values, nil +} + +func findMetricFamily(metricFamilies []*dto.MetricFamily, metricName string) *dto.MetricFamily { + for _, mFamily := range metricFamilies { + if mFamily.GetName() == metricName { + return mFamily + } + } + return nil +} diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go index 702fb7fcfde..adfc1999989 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "reflect" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -591,3 +592,104 @@ func TestGetHistogramVecFromGatherer(t *testing.T) { }) } } + +func TestGetCounterValuesFromGatherer(t *testing.T) { + namespace := "namespace" + subsystem := "subsystem" + name := "metric_test_name" + metricName := fmt.Sprintf("%s_%s_%s", namespace, subsystem, name) + + tests := map[string]struct { + metricName string // Empty is replaced with valid name. + lvMap map[string]string + labelName string + + wantCounterValues map[string]float64 + wantErr string + }{ + "wrong-metric": { + metricName: "no-such-metric", + wantErr: `metric "no-such-metric" not found`, + }, + + "none": { + metricName: metricName, + lvMap: map[string]string{"no-such-label": "a"}, + + wantCounterValues: map[string]float64{}, + }, + + "value1-0": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-0"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 1.5, "value2-1": 2.5}, + }, + + "value1-1": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-1"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 3.5, "value2-1": 4.5}, + }, + + "value1-1-value2-0-none": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-1", "label2": "value2-0"}, + labelName: "none", + + wantCounterValues: map[string]float64{}, + }, + + "value1-0-value2-0-one": { + metricName: metricName, + lvMap: map[string]string{"label1": "value1-0", "label2": "value2-0"}, + labelName: "label2", + + wantCounterValues: map[string]float64{"value2-0": 1.5}, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + // CounterVec has two labels defined. + labels := []string{"label1", "label2"} + counterOpts := &metrics.CounterOpts{ + Namespace: "namespace", + Name: "metric_test_name", + Subsystem: "subsystem", + Help: "counter help message", + } + vec := metrics.NewCounterVec(counterOpts, labels) + // Use local registry + var registry = metrics.NewKubeRegistry() + var gather metrics.Gatherer = registry + registry.MustRegister(vec) + // Observe two metrics with same value for label1 but different value of label2. + vec.WithLabelValues("value1-0", "value2-0").Add(1.5) + vec.WithLabelValues("value1-0", "value2-1").Add(2.5) + vec.WithLabelValues("value1-1", "value2-0").Add(3.5) + vec.WithLabelValues("value1-1", "value2-1").Add(4.5) + + // The check for empty metric apparently cannot be tested: registering + // a NewCounterVec with no values has the affect that it doesn't get + // returned, leading to "not found". + + counterValues, err := GetCounterValuesFromGatherer(gather, tt.metricName, tt.lvMap, tt.labelName) + if err != nil { + if tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("expected error %q, got instead: %v", tt.wantErr, err) + } + return + } + if tt.wantErr != "" { + t.Fatalf("expected error %q, got none", tt.wantErr) + } + + if diff := cmp.Diff(tt.wantCounterValues, counterValues); diff != "" { + t.Errorf("Got unexpected HistogramVec (-want +got):\n%s", diff) + } + }) + } +} diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index 261dd5e776f..8a4bccba6b9 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -175,3 +175,22 @@ the ci-benchmark-scheduler-perf periodic job will fail with an error log such as This allows to analyze which workload failed. Make sure that the failure is not an outlier by checking multiple runs of the job. If the failures are not related to any regression, but to an incorrect threshold setting, it is reasonable to decrease it. + +### Visualization + +Some support for visualizing progress over time is built into the +benchmarks. The measurement operation which creates pods writes .dat files like +this: + + test/integration/scheduler_perf/SchedulingBasic_5000Nodes_2023-03-17T14:52:09Z.dat + +This file is in a text format that [gnuplot](http://www.gnuplot.info/) can +read. A wrapper script selects some suitable parameters: + + test/integration/scheduler_perf/gnuplot.sh test/integration/scheduler_perf/*.dat + +It plots in an interactive window by default. To write into a file, use + + test/integration/scheduler_perf/gnuplot.sh \ + -e 'set term png; set output ".png"' \ + test/integration/scheduler_perf/*.dat diff --git a/test/integration/scheduler_perf/gnuplot.sh b/test/integration/scheduler_perf/gnuplot.sh new file mode 100755 index 00000000000..885276559ae --- /dev/null +++ b/test/integration/scheduler_perf/gnuplot.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Invoke this script with a list of *.dat and it'll plot them with gnuplot. +# Any non-file parameter is passed through to gnuplot. By default, +# an X11 window is used to display the result. To write into a file, +# use +# -e "set term png; set output .png" + +files=() +args=( -e "set term x11 persist" ) + +for i in "$@"; do + if [ -f "$i" ]; then + files+=("$i") + else + args+=("$i") + fi +done + +( + cat < 0 { sort.Float64s(tc.schedulingThroughputs) sum := 0.0