Merge pull request #127499 from pohly/scheduler-perf-updates

scheduler_perf: updates to enhance performance testing of DRA
This commit is contained in:
Kubernetes Prow Robot 2024-09-25 13:32:00 +01:00 committed by GitHub
commit 5fc4e71a30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 872 additions and 95 deletions

View File

@ -258,12 +258,8 @@ func GetHistogramVecFromGatherer(gatherer metrics.Gatherer, metricName string, l
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, mFamily := range m {
if mFamily.GetName() == metricName { metricFamily = findMetricFamily(m, metricName)
metricFamily = mFamily
break
}
}
if metricFamily == nil { if metricFamily == nil {
return nil, fmt.Errorf("metric %q not found", metricName) return nil, fmt.Errorf("metric %q not found", metricName)
@ -433,3 +429,47 @@ func LabelsMatch(metric *dto.Metric, labelFilter map[string]string) bool {
return true return true
} }
// GetCounterVecFromGatherer collects a counter that matches the given name
// from a gatherer implementing k8s.io/component-base/metrics.Gatherer interface.
// It returns all counter values that had a label with a certain name in a map
// that uses the label value as keys.
//
// Used only for testing purposes where we need to gather metrics directly from a running binary (without metrics endpoint).
func GetCounterValuesFromGatherer(gatherer metrics.Gatherer, metricName string, lvMap map[string]string, labelName string) (map[string]float64, error) {
m, err := gatherer.Gather()
if err != nil {
return nil, err
}
metricFamily := findMetricFamily(m, metricName)
if metricFamily == nil {
return nil, fmt.Errorf("metric %q not found", metricName)
}
if len(metricFamily.GetMetric()) == 0 {
return nil, fmt.Errorf("metric %q is empty", metricName)
}
values := make(map[string]float64)
for _, metric := range metricFamily.GetMetric() {
if LabelsMatch(metric, lvMap) {
if counter := metric.GetCounter(); counter != nil {
for _, labelPair := range metric.Label {
if labelPair.GetName() == labelName {
values[labelPair.GetValue()] = counter.GetValue()
}
}
}
}
}
return values, nil
}
func findMetricFamily(metricFamilies []*dto.MetricFamily, metricName string) *dto.MetricFamily {
for _, mFamily := range metricFamilies {
if mFamily.GetName() == metricName {
return mFamily
}
}
return nil
}

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"math" "math"
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -591,3 +592,104 @@ func TestGetHistogramVecFromGatherer(t *testing.T) {
}) })
} }
} }
func TestGetCounterValuesFromGatherer(t *testing.T) {
namespace := "namespace"
subsystem := "subsystem"
name := "metric_test_name"
metricName := fmt.Sprintf("%s_%s_%s", namespace, subsystem, name)
tests := map[string]struct {
metricName string // Empty is replaced with valid name.
lvMap map[string]string
labelName string
wantCounterValues map[string]float64
wantErr string
}{
"wrong-metric": {
metricName: "no-such-metric",
wantErr: `metric "no-such-metric" not found`,
},
"none": {
metricName: metricName,
lvMap: map[string]string{"no-such-label": "a"},
wantCounterValues: map[string]float64{},
},
"value1-0": {
metricName: metricName,
lvMap: map[string]string{"label1": "value1-0"},
labelName: "label2",
wantCounterValues: map[string]float64{"value2-0": 1.5, "value2-1": 2.5},
},
"value1-1": {
metricName: metricName,
lvMap: map[string]string{"label1": "value1-1"},
labelName: "label2",
wantCounterValues: map[string]float64{"value2-0": 3.5, "value2-1": 4.5},
},
"value1-1-value2-0-none": {
metricName: metricName,
lvMap: map[string]string{"label1": "value1-1", "label2": "value2-0"},
labelName: "none",
wantCounterValues: map[string]float64{},
},
"value1-0-value2-0-one": {
metricName: metricName,
lvMap: map[string]string{"label1": "value1-0", "label2": "value2-0"},
labelName: "label2",
wantCounterValues: map[string]float64{"value2-0": 1.5},
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
// CounterVec has two labels defined.
labels := []string{"label1", "label2"}
counterOpts := &metrics.CounterOpts{
Namespace: "namespace",
Name: "metric_test_name",
Subsystem: "subsystem",
Help: "counter help message",
}
vec := metrics.NewCounterVec(counterOpts, labels)
// Use local registry
var registry = metrics.NewKubeRegistry()
var gather metrics.Gatherer = registry
registry.MustRegister(vec)
// Observe two metrics with same value for label1 but different value of label2.
vec.WithLabelValues("value1-0", "value2-0").Add(1.5)
vec.WithLabelValues("value1-0", "value2-1").Add(2.5)
vec.WithLabelValues("value1-1", "value2-0").Add(3.5)
vec.WithLabelValues("value1-1", "value2-1").Add(4.5)
// The check for empty metric apparently cannot be tested: registering
// a NewCounterVec with no values has the affect that it doesn't get
// returned, leading to "not found".
counterValues, err := GetCounterValuesFromGatherer(gather, tt.metricName, tt.lvMap, tt.labelName)
if err != nil {
if tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("expected error %q, got instead: %v", tt.wantErr, err)
}
return
}
if tt.wantErr != "" {
t.Fatalf("expected error %q, got none", tt.wantErr)
}
if diff := cmp.Diff(tt.wantCounterValues, counterValues); diff != "" {
t.Errorf("Got unexpected HistogramVec (-want +got):\n%s", diff)
}
})
}
}

View File

@ -175,3 +175,22 @@ the ci-benchmark-scheduler-perf periodic job will fail with an error log such as
This allows to analyze which workload failed. Make sure that the failure is not an outlier This allows to analyze which workload failed. Make sure that the failure is not an outlier
by checking multiple runs of the job. If the failures are not related to any regression, by checking multiple runs of the job. If the failures are not related to any regression,
but to an incorrect threshold setting, it is reasonable to decrease it. but to an incorrect threshold setting, it is reasonable to decrease it.
### Visualization
Some support for visualizing progress over time is built into the
benchmarks. The measurement operation which creates pods writes .dat files like
this:
test/integration/scheduler_perf/SchedulingBasic_5000Nodes_2023-03-17T14:52:09Z.dat
This file is in a text format that [gnuplot](http://www.gnuplot.info/) can
read. A wrapper script selects some suitable parameters:
test/integration/scheduler_perf/gnuplot.sh test/integration/scheduler_perf/*.dat
It plots in an interactive window by default. To write into a file, use
test/integration/scheduler_perf/gnuplot.sh \
-e 'set term png; set output "<output>.png"' \
test/integration/scheduler_perf/*.dat

View File

@ -1167,7 +1167,9 @@
maxClaimsPerNode: 20 maxClaimsPerNode: 20
# SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate # SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate
# and dynamically creates ResourceClaim instances for each pod. # and dynamically creates ResourceClaim instances for each pod. Node, pod and
# device counts are chosen so that the cluster gets filled up completely.
#
# The driver uses structured parameters. # The driver uses structured parameters.
- name: SchedulingWithResourceClaimTemplateStructured - name: SchedulingWithResourceClaimTemplateStructured
featureGates: featureGates:
@ -1234,6 +1236,125 @@
measurePods: 2500 measurePods: 2500
maxClaimsPerNode: 10 maxClaimsPerNode: 10
# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate and
# dynamically creates ResourceClaim instances for each pod. It creates ten
# pods, waits for them to be scheduled, deletes them, and starts again,
# so the cluster remains at the same level of utilization.
#
# The number of already allocated claims can be varied, thus simulating
# various degrees of pre-existing resource utilization.
#
# The driver uses structured parameters.
- name: SteadyStateClusterResourceClaimTemplateStructured
featureGates:
DynamicResourceAllocation: true
# SchedulerQueueingHints: true
workloadTemplate:
- opcode: createNodes
countParam: $nodesWithoutDRA
- opcode: createNodes
nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml
countParam: $nodesWithDRA
- opcode: createResourceDriver
driverName: test-driver.cdi.k8s.io
nodes: scheduler-perf-dra-*
maxClaimsPerNodeParam: $maxClaimsPerNode
structuredParameters: true
- opcode: createAny
templatePath: config/dra/deviceclass-structured.yaml
- opcode: createAny
templatePath: config/dra/resourceclaim-structured.yaml
countParam: $initClaims
namespace: init
- opcode: allocResourceClaims
namespace: init
- opcode: createAny
templatePath: config/dra/resourceclaimtemplate-structured.yaml
namespace: test
- opcode: createPods
namespace: test
count: 10
steadyState: true
durationParam: $duration
podTemplatePath: config/dra/pod-with-claim-template.yaml
collectMetrics: true
workloads:
- name: fast
labels: [integration-test, fast, short]
params:
# This testcase runs through all code paths without
# taking too long overall.
nodesWithDRA: 1
nodesWithoutDRA: 1
initClaims: 0
maxClaimsPerNode: 10
duration: 2s
- name: empty_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
- name: empty_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
- name: empty_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
# In the "half" scenarios, half of the devices are in use.
- name: half_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initClaims: 500
maxClaimsPerNode: 10
duration: 10s
- name: half_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initClaims: 1000
maxClaimsPerNode: 10
duration: 10s
- name: half_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initClaims: 2500
maxClaimsPerNode: 10
duration: 10s
# In the "full" scenarios, the cluster can accommodate exactly 10 additional pods.
- name: full_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initClaims: 990
maxClaimsPerNode: 10
duration: 10s
- name: full_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initClaims: 1990
maxClaimsPerNode: 10
duration: 10s
- name: full_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initClaims: 4990
maxClaimsPerNode: 10
duration: 10s
# SchedulingWithResourceClaimTemplate uses ResourceClaims # SchedulingWithResourceClaimTemplate uses ResourceClaims
# with deterministic names that are shared between pods. # with deterministic names that are shared between pods.
# There is a fixed ratio of 1:5 between claims and pods. # There is a fixed ratio of 1:5 between claims and pods.

View File

@ -56,9 +56,6 @@ type createAny struct {
var _ runnableOp = &createAny{} var _ runnableOp = &createAny{}
func (c *createAny) isValid(allowParameterization bool) error { func (c *createAny) isValid(allowParameterization bool) error {
if c.Opcode != createAnyOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", c.Opcode, createAnyOpcode)
}
if c.TemplatePath == "" { if c.TemplatePath == "" {
return fmt.Errorf("TemplatePath must be set") return fmt.Errorf("TemplatePath must be set")
} }

View File

@ -19,15 +19,26 @@ package benchmark
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand/v2"
"path/filepath" "path/filepath"
"reflect"
"sync" "sync"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3" resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
) )
// createResourceClaimsOp defines an op where resource claims are created. // createResourceClaimsOp defines an op where resource claims are created.
@ -48,9 +59,6 @@ var _ realOp = &createResourceClaimsOp{}
var _ runnableOp = &createResourceClaimsOp{} var _ runnableOp = &createResourceClaimsOp{}
func (op *createResourceClaimsOp) isValid(allowParameterization bool) error { func (op *createResourceClaimsOp) isValid(allowParameterization bool) error {
if op.Opcode != createResourceClaimsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode)
}
if !isValidCount(allowParameterization, op.Count, op.CountParam) { if !isValidCount(allowParameterization, op.Count, op.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam) return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam)
} }
@ -137,9 +145,6 @@ var _ realOp = &createResourceDriverOp{}
var _ runnableOp = &createResourceDriverOp{} var _ runnableOp = &createResourceDriverOp{}
func (op *createResourceDriverOp) isValid(allowParameterization bool) error { func (op *createResourceDriverOp) isValid(allowParameterization bool) error {
if op.Opcode != createResourceDriverOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode)
}
if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) { if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) {
return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam)
} }
@ -248,10 +253,126 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou
slice.Spec.Devices = append(slice.Spec.Devices, slice.Spec.Devices = append(slice.Spec.Devices,
resourceapi.Device{ resourceapi.Device{
Name: fmt.Sprintf("instance-%d", i), Name: fmt.Sprintf("instance-%d", i),
Basic: &resourceapi.BasicDevice{}, Basic: &resourceapi.BasicDevice{
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"model": {StringValue: ptr.To("A100")},
"family": {StringValue: ptr.To("GPU")},
"driverVersion": {VersionValue: ptr.To("1.2.3")},
"dra.example.com/numa": {IntValue: ptr.To(int64(i))},
},
Capacity: map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("1Gi"),
},
},
}, },
) )
} }
return slice return slice
} }
// allocResourceClaimsOp defines an op where resource claims with structured
// parameters get allocated without being associated with a pod.
type allocResourceClaimsOp struct {
// Must be allocResourceClaimsOpcode.
Opcode operationCode
// Namespace where claims are to be allocated, all namespaces if empty.
Namespace string
}
var _ realOp = &allocResourceClaimsOp{}
var _ runnableOp = &allocResourceClaimsOp{}
func (op *allocResourceClaimsOp) isValid(allowParameterization bool) error {
return nil
}
func (op *allocResourceClaimsOp) collectsMetrics() bool {
return false
}
func (op *allocResourceClaimsOp) patchParams(w *workload) (realOp, error) {
return op, op.isValid(false)
}
func (op *allocResourceClaimsOp) requiredNamespaces() []string { return nil }
func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
claims, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(op.Namespace).List(tCtx, metav1.ListOptions{})
tCtx.ExpectNoError(err, "list claims")
tCtx.Logf("allocating %d ResourceClaims", len(claims.Items))
tCtx = ktesting.WithCancel(tCtx)
defer tCtx.Cancel("allocResourceClaimsOp.run is done")
// Track cluster state.
informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0)
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister()
sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil)
claimLister := claimLister{cache: claimCache}
informerFactory.Start(tCtx.Done())
defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
informerFactory.Shutdown()
}()
syncedInformers := informerFactory.WaitForCacheSync(tCtx.Done())
expectSyncedInformers := map[reflect.Type]bool{
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&v1.Node{}): true,
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes")
// Allocate one claim at a time, picking nodes randomly. Each
// allocation is stored immediately, using the claim cache to avoid
// having to wait for the actual informer update.
claims:
for i := range claims.Items {
claim := &claims.Items[i]
if claim.Status.Allocation != nil {
continue
}
allocator, err := structured.NewAllocator(tCtx, []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
for _, node := range nodes {
result, err := allocator.Allocate(tCtx, node)
tCtx.ExpectNoError(err, "allocate claim")
if result != nil {
claim = claim.DeepCopy()
claim.Status.Allocation = result[0]
claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update claim status with allocation")
tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim")
continue claims
}
}
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items))
}
}
type claimLister struct {
cache *assumecache.AssumeCache
}
func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
objs := c.cache.List(nil)
allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation != nil {
allocatedClaims = append(allocatedClaims, claim)
}
}
return allocatedClaims, nil
}

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
# Copyright 2024 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Invoke this script with a list of *.dat and it'll plot them with gnuplot.
# Any non-file parameter is passed through to gnuplot. By default,
# an X11 window is used to display the result. To write into a file,
# use
# -e "set term png; set output <output>.png"
files=()
args=( -e "set term x11 persist" )
for i in "$@"; do
if [ -f "$i" ]; then
files+=("$i")
else
args+=("$i")
fi
done
(
cat <<EOF
set ytics autofreq nomirror tc lt 1
set xlabel 'measurement runtime [seconds]'
set ylabel 'scheduling rate [pods/second]' tc lt 1
set y2tics autofreq nomirror tc lt 2
set y2label 'scheduling attempts per pod' tc lt 2
# Derivative from https://stackoverflow.com/questions/15751226/how-can-i-plot-the-derivative-of-a-graph-in-gnuplot.
d2(x,y) = (\$0 == 0) ? (x1 = x, y1 = y, 1/0) : (x2 = x1, x1 = x, y2 = y1, y1 = y, (y1-y2)/(x1-x2))
dx = 0.25
EOF
echo -n "plot "
for file in "${files[@]}"; do
echo -n "'${file}' using (\$1 - dx):(d2(\$1, \$2)) with linespoints title '$(basename "$file" .dat | sed -e 's/_/ /g') metric rate' axis x1y1, "
echo -n "'${file}' using (\$1 - dx):(d2(\$1, \$4)) with linespoints title '$(basename "$file" .dat | sed -e 's/_/ /g') observed rate' axis x1y1, "
echo -n "'${file}' using 1:(\$3/\$2) with linespoints title '$(basename "$file" .dat | sed -e 's/_/ /g') attempts' axis x1y2, "
done
echo
) | tee /dev/stderr | gnuplot "${args[@]}" -

View File

@ -17,6 +17,7 @@ limitations under the License.
package benchmark package benchmark
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -48,6 +49,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
@ -62,16 +64,19 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting" "k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/kubernetes/test/utils/ktesting/initoption" "k8s.io/kubernetes/test/utils/ktesting/initoption"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
) )
type operationCode string type operationCode string
const ( const (
allocResourceClaimsOpcode operationCode = "allocResourceClaims"
createAnyOpcode operationCode = "createAny" createAnyOpcode operationCode = "createAny"
createNodesOpcode operationCode = "createNodes" createNodesOpcode operationCode = "createNodes"
createNamespacesOpcode operationCode = "createNamespaces" createNamespacesOpcode operationCode = "createNamespaces"
@ -340,7 +345,7 @@ func (ms thresholdMetricSelector) isValid(mcc *metricsCollectorConfig) error {
} }
type params struct { type params struct {
params map[string]int params map[string]any
// isUsed field records whether params is used or not. // isUsed field records whether params is used or not.
isUsed map[string]bool isUsed map[string]bool
} }
@ -357,14 +362,14 @@ type params struct {
// to: // to:
// //
// params{ // params{
// params: map[string]int{ // params: map[string]any{
// "intNodes": 500, // "intNodes": 500,
// "initPods": 50, // "initPods": 50,
// }, // },
// isUsed: map[string]bool{}, // empty map // isUsed: map[string]bool{}, // empty map
// } // }
func (p *params) UnmarshalJSON(b []byte) error { func (p *params) UnmarshalJSON(b []byte) error {
aux := map[string]int{} aux := map[string]any{}
if err := json.Unmarshal(b, &aux); err != nil { if err := json.Unmarshal(b, &aux); err != nil {
return err return err
@ -375,14 +380,31 @@ func (p *params) UnmarshalJSON(b []byte) error {
return nil return nil
} }
// get returns param. // get retrieves the parameter as an integer
func (p params) get(key string) (int, error) { func (p params) get(key string) (int, error) {
// JSON unmarshals integer constants in an "any" field as float.
f, err := getParam[float64](p, key)
if err != nil {
return 0, err
}
return int(f), nil
}
// getParam retrieves the parameter as specific type. There is no conversion,
// so in practice this means that only types that JSON unmarshaling uses
// (float64, string, bool) work.
func getParam[T float64 | string | bool](p params, key string) (T, error) {
p.isUsed[key] = true p.isUsed[key] = true
param, ok := p.params[key] param, ok := p.params[key]
if ok { var t T
return param, nil if !ok {
return t, fmt.Errorf("parameter %s is undefined", key)
} }
return 0, fmt.Errorf("parameter %s is undefined", key) t, ok = param.(T)
if !ok {
return t, fmt.Errorf("parameter %s has the wrong type %T", key, param)
}
return t, nil
} }
// unusedParams returns the names of unusedParams // unusedParams returns the names of unusedParams
@ -404,36 +426,44 @@ type op struct {
// UnmarshalJSON is a custom unmarshaler for the op struct since we don't know // UnmarshalJSON is a custom unmarshaler for the op struct since we don't know
// which op we're decoding at runtime. // which op we're decoding at runtime.
func (op *op) UnmarshalJSON(b []byte) error { func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := []realOp{ possibleOps := map[operationCode]realOp{
&createAny{}, allocResourceClaimsOpcode: &allocResourceClaimsOp{},
&createNodesOp{}, createAnyOpcode: &createAny{},
&createNamespacesOp{}, createNodesOpcode: &createNodesOp{},
&createPodsOp{}, createNamespacesOpcode: &createNamespacesOp{},
&createPodSetsOp{}, createPodsOpcode: &createPodsOp{},
&deletePodsOp{}, createPodSetsOpcode: &createPodSetsOp{},
&createResourceClaimsOp{}, deletePodsOpcode: &deletePodsOp{},
&createResourceDriverOp{}, createResourceClaimsOpcode: &createResourceClaimsOp{},
&churnOp{}, createResourceDriverOpcode: &createResourceDriverOp{},
&barrierOp{}, churnOpcode: &churnOp{},
&sleepOp{}, barrierOpcode: &barrierOp{},
&startCollectingMetricsOp{}, sleepOpcode: &sleepOp{},
&stopCollectingMetricsOp{}, startCollectingMetricsOpcode: &startCollectingMetricsOp{},
stopCollectingMetricsOpcode: &stopCollectingMetricsOp{},
// TODO(#94601): add a delete nodes op to simulate scaling behaviour? // TODO(#94601): add a delete nodes op to simulate scaling behaviour?
} }
var firstError error // First determine the opcode using lenient decoding (= ignore extra fields).
for _, possibleOp := range possibleOps { var possibleOp struct {
if err := json.Unmarshal(b, possibleOp); err == nil { Opcode operationCode
if err2 := possibleOp.isValid(true); err2 == nil { }
op.realOp = possibleOp if err := json.Unmarshal(b, &possibleOp); err != nil {
return fmt.Errorf("decoding opcode from %s: %w", string(b), err)
}
realOp, ok := possibleOps[possibleOp.Opcode]
if !ok {
return fmt.Errorf("unknown opcode %q in %s", possibleOp.Opcode, string(b))
}
decoder := json.NewDecoder(bytes.NewReader(b))
decoder.DisallowUnknownFields()
if err := decoder.Decode(realOp); err != nil {
return fmt.Errorf("decoding %s into %T: %w", string(b), realOp, err)
}
if err := realOp.isValid(true); err != nil {
return fmt.Errorf("%s not valid for %T: %w", string(b), realOp, err)
}
op.realOp = realOp
return nil return nil
} else if firstError == nil {
// Don't return an error yet. Even though this op is invalid, it may
// still match other possible ops.
firstError = err2
}
}
}
return fmt.Errorf("cannot unmarshal %s into any known op type: %w", string(b), firstError)
} }
// realOp is an interface that is implemented by different structs. To evaluate // realOp is an interface that is implemented by different structs. To evaluate
@ -441,6 +471,8 @@ func (op *op) UnmarshalJSON(b []byte) error {
type realOp interface { type realOp interface {
// isValid verifies the validity of the op args such as node/pod count. Note // isValid verifies the validity of the op args such as node/pod count. Note
// that we don't catch undefined parameters at this stage. // that we don't catch undefined parameters at this stage.
//
// This returns errInvalidOp if the configured operation does not match.
isValid(allowParameterization bool) error isValid(allowParameterization bool) error
// collectsMetrics checks if the op collects metrics. // collectsMetrics checks if the op collects metrics.
collectsMetrics() bool collectsMetrics() bool
@ -497,9 +529,6 @@ type createNodesOp struct {
} }
func (cno *createNodesOp) isValid(allowParameterization bool) error { func (cno *createNodesOp) isValid(allowParameterization bool) error {
if cno.Opcode != createNodesOpcode {
return fmt.Errorf("invalid opcode %q", cno.Opcode)
}
if !isValidCount(allowParameterization, cno.Count, cno.CountParam) { if !isValidCount(allowParameterization, cno.Count, cno.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam) return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam)
} }
@ -538,9 +567,6 @@ type createNamespacesOp struct {
} }
func (cmo *createNamespacesOp) isValid(allowParameterization bool) error { func (cmo *createNamespacesOp) isValid(allowParameterization bool) error {
if cmo.Opcode != createNamespacesOpcode {
return fmt.Errorf("invalid opcode %q", cmo.Opcode)
}
if !isValidCount(allowParameterization, cmo.Count, cmo.CountParam) { if !isValidCount(allowParameterization, cmo.Count, cmo.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam) return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam)
} }
@ -572,6 +598,27 @@ type createPodsOp struct {
Count int Count int
// Template parameter for Count. // Template parameter for Count.
CountParam string CountParam string
// If false, Count pods get created rapidly. This can be used to
// measure how quickly the scheduler can fill up a cluster.
//
// If true, Count pods get created, the operation waits for
// a pod to get scheduled, deletes it and then creates another.
// This continues until the configured Duration is over.
// Metrics collection, if enabled, runs in parallel.
//
// This mode can be used to measure how the scheduler behaves
// in a steady state where the cluster is always at roughly the
// same level of utilization. Pods can be created in a separate,
// earlier operation to simulate non-empty clusters.
//
// Note that the operation will delete any scheduled pod in
// the namespace, so use different namespaces for pods that
// are supposed to be kept running.
SteadyState bool
// How long to keep the cluster in a steady state.
Duration metav1.Duration
// Template parameter for Duration.
DurationParam string
// Whether or not to enable metrics collection for this createPodsOp. // Whether or not to enable metrics collection for this createPodsOp.
// Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at // Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at
// the same time for a particular createPodsOp. // the same time for a particular createPodsOp.
@ -595,9 +642,6 @@ type createPodsOp struct {
} }
func (cpo *createPodsOp) isValid(allowParameterization bool) error { func (cpo *createPodsOp) isValid(allowParameterization bool) error {
if cpo.Opcode != createPodsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", cpo.Opcode, createPodsOpcode)
}
if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) { if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam) return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
} }
@ -607,6 +651,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error {
// use-cases right now. // use-cases right now.
return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time") return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
} }
if cpo.SkipWaitToCompletion && cpo.SteadyState {
return errors.New("skipWaitToCompletion and steadyState cannot be true at the same time")
}
return nil return nil
} }
@ -622,6 +669,15 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
return nil, err return nil, err
} }
} }
if cpo.DurationParam != "" {
durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:])
if err != nil {
return nil, err
}
if cpo.Duration.Duration, err = time.ParseDuration(durationStr); err != nil {
return nil, fmt.Errorf("parsing duration parameter %s: %w", cpo.DurationParam, err)
}
}
return &cpo, (&cpo).isValid(false) return &cpo, (&cpo).isValid(false)
} }
@ -641,9 +697,6 @@ type createPodSetsOp struct {
} }
func (cpso *createPodSetsOp) isValid(allowParameterization bool) error { func (cpso *createPodSetsOp) isValid(allowParameterization bool) error {
if cpso.Opcode != createPodSetsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", cpso.Opcode, createPodSetsOpcode)
}
if !isValidCount(allowParameterization, cpso.Count, cpso.CountParam) { if !isValidCount(allowParameterization, cpso.Count, cpso.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam) return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam)
} }
@ -729,9 +782,6 @@ type churnOp struct {
} }
func (co *churnOp) isValid(_ bool) error { func (co *churnOp) isValid(_ bool) error {
if co.Opcode != churnOpcode {
return fmt.Errorf("invalid opcode %q", co.Opcode)
}
if co.Mode != Recreate && co.Mode != Create { if co.Mode != Recreate && co.Mode != Create {
return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create}) return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
} }
@ -767,9 +817,6 @@ type barrierOp struct {
} }
func (bo *barrierOp) isValid(allowParameterization bool) error { func (bo *barrierOp) isValid(allowParameterization bool) error {
if bo.Opcode != barrierOpcode {
return fmt.Errorf("invalid opcode %q", bo.Opcode)
}
return nil return nil
} }
@ -805,9 +852,6 @@ func (so *sleepOp) UnmarshalJSON(data []byte) (err error) {
} }
func (so *sleepOp) isValid(_ bool) error { func (so *sleepOp) isValid(_ bool) error {
if so.Opcode != sleepOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", so.Opcode, sleepOpcode)
}
return nil return nil
} }
@ -831,9 +875,6 @@ type startCollectingMetricsOp struct {
} }
func (scm *startCollectingMetricsOp) isValid(_ bool) error { func (scm *startCollectingMetricsOp) isValid(_ bool) error {
if scm.Opcode != startCollectingMetricsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, startCollectingMetricsOpcode)
}
if len(scm.Namespaces) == 0 { if len(scm.Namespaces) == 0 {
return fmt.Errorf("namespaces cannot be empty") return fmt.Errorf("namespaces cannot be empty")
} }
@ -857,9 +898,6 @@ type stopCollectingMetricsOp struct {
} }
func (scm *stopCollectingMetricsOp) isValid(_ bool) error { func (scm *stopCollectingMetricsOp) isValid(_ bool) error {
if scm.Opcode != stopCollectingMetricsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, stopCollectingMetricsOpcode)
}
return nil return nil
} }
@ -1057,6 +1095,30 @@ func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkr
// Reset metrics to prevent metrics generated in current workload gets // Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload. // carried over to the next workload.
legacyregistry.Reset() legacyregistry.Reset()
// Exactly one result is expected to contain the progress information.
for _, item := range results {
if len(item.progress) == 0 {
continue
}
destFile, err := dataFilename(strings.ReplaceAll(fmt.Sprintf("%s_%s_%s.dat", tc.Name, w.Name, runID), "/", "_"))
if err != nil {
b.Fatalf("prepare data file: %v", err)
}
f, err := os.Create(destFile)
if err != nil {
b.Fatalf("create data file: %v", err)
}
// Print progress over time.
for _, sample := range item.progress {
fmt.Fprintf(f, "%.1fs %d %d %d %f\n", sample.ts.Sub(item.start).Seconds(), sample.completed, sample.attempts, sample.observedTotal, sample.observedRate)
}
if err := f.Close(); err != nil {
b.Fatalf("closing data file: %v", err)
}
}
}) })
} }
}) })
@ -1315,14 +1377,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}) collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace})
defer collectorCtx.Cancel("cleaning up") defer collectorCtx.Cancel("cleaning up")
} }
if err := createPods(tCtx, namespace, concreteOp); err != nil { if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err) tCtx.Fatalf("op %d: %v", opIndex, err)
} }
if concreteOp.SkipWaitToCompletion { switch {
case concreteOp.SkipWaitToCompletion:
// Only record those namespaces that may potentially require barriers // Only record those namespaces that may potentially require barriers
// in the future. // in the future.
numPodsScheduledPerNamespace[namespace] += concreteOp.Count numPodsScheduledPerNamespace[namespace] += concreteOp.Count
} else { case concreteOp.SteadyState:
if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
default:
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil { if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
} }
@ -1605,7 +1672,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte
), nil ), nil
} }
func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error { // createPodsRapidly implements the "create pods rapidly" mode of [createPodsOp].
// It's a nop when cpo.SteadyState is true.
func createPodsRapidly(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
if cpo.SteadyState {
return nil
}
strategy, err := getPodStrategy(cpo) strategy, err := getPodStrategy(cpo)
if err != nil { if err != nil {
return err return err
@ -1617,6 +1689,147 @@ func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) err
return podCreator.CreatePods(tCtx) return podCreator.CreatePods(tCtx)
} }
// createPodsSteadily implements the "create pods and delete pods" mode of [createPodsOp].
// It's a nop when cpo.SteadyState is false.
func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer coreinformers.PodInformer, cpo *createPodsOp) error {
if !cpo.SteadyState {
return nil
}
strategy, err := getPodStrategy(cpo)
if err != nil {
return err
}
tCtx.Logf("creating pods in namespace %q for %s", namespace, cpo.Duration)
tCtx = ktesting.WithTimeout(tCtx, cpo.Duration.Duration, fmt.Sprintf("the operation ran for the configured %s", cpo.Duration.Duration))
// Start watching pods in the namespace. Any pod which is seen as being scheduled
// gets deleted.
scheduledPods := make(chan *v1.Pod, cpo.Count)
scheduledPodsClosed := false
var mutex sync.Mutex
defer func() {
mutex.Lock()
defer mutex.Unlock()
close(scheduledPods)
scheduledPodsClosed = true
}()
existingPods := 0
runningPods := 0
onPodChange := func(oldObj, newObj any) {
oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
if err != nil {
tCtx.Errorf("unexpected pod events: %v", err)
return
}
mutex.Lock()
defer mutex.Unlock()
if oldPod == nil {
existingPods++
}
if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" {
// Got scheduled.
runningPods++
// Only ask for deletion in our namespace.
if newPod.Namespace != namespace {
return
}
if !scheduledPodsClosed {
select {
case <-tCtx.Done():
case scheduledPods <- newPod:
}
}
}
}
handle, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
onPodChange(nil, obj)
},
UpdateFunc: func(oldObj, newObj any) {
onPodChange(oldObj, newObj)
},
DeleteFunc: func(obj any) {
pod, _, err := schedutil.As[*v1.Pod](obj, nil)
if err != nil {
tCtx.Errorf("unexpected pod events: %v", err)
return
}
existingPods--
if pod.Spec.NodeName != "" {
runningPods--
}
},
})
if err != nil {
return fmt.Errorf("register event handler: %w", err)
}
defer func() {
tCtx.ExpectNoError(podInformer.Informer().RemoveEventHandler(handle), "remove event handler")
}()
// Seed the namespace with the initial number of pods.
if err := strategy(tCtx, tCtx.Client(), namespace, cpo.Count); err != nil {
return fmt.Errorf("create initial %d pods: %w", cpo.Count, err)
}
// Now loop until we are done. Report periodically how many pods were scheduled.
countScheduledPods := 0
lastCountScheduledPods := 0
logPeriod := time.Second
ticker := time.NewTicker(logPeriod)
defer ticker.Stop()
for {
select {
case <-tCtx.Done():
tCtx.Logf("Completed after seeing %d scheduled pod: %v", countScheduledPods, context.Cause(tCtx))
return nil
case <-scheduledPods:
countScheduledPods++
if countScheduledPods%cpo.Count == 0 {
// All scheduled. Start over with a new batch.
err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, metav1.DeleteOptions{
GracePeriodSeconds: ptr.To(int64(0)),
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), // Foreground will block.
}, metav1.ListOptions{})
// Ignore errors when the time is up. errors.Is(context.Canceled) would
// be more precise, but doesn't work because client-go doesn't reliably
// propagate it. Instead, this was seen:
// client rate limiter Wait returned an error: rate: Wait(n=1) would exceed context deadline
if tCtx.Err() != nil {
continue
}
if err != nil {
return fmt.Errorf("delete scheduled pods: %w", err)
}
err = strategy(tCtx, tCtx.Client(), namespace, cpo.Count)
if tCtx.Err() != nil {
continue
}
if err != nil {
return fmt.Errorf("create next batch of pods: %w", err)
}
}
case <-ticker.C:
delta := countScheduledPods - lastCountScheduledPods
lastCountScheduledPods = countScheduledPods
func() {
mutex.Lock()
defer mutex.Unlock()
tCtx.Logf("%d pods got scheduled in total in namespace %q, overall %d out of %d pods scheduled: %f pods/s in last interval",
countScheduledPods, namespace,
runningPods, existingPods,
float64(delta)/logPeriod.Seconds(),
)
}()
}
}
}
// waitUntilPodsScheduledInNamespace blocks until all pods in the given // waitUntilPodsScheduledInNamespace blocks until all pods in the given
// namespace are scheduled. Times out after 10 minutes because even at the // namespace are scheduled. Times out after 10 minutes because even at the
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.

View File

@ -26,6 +26,7 @@ import (
"path" "path"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -35,6 +36,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
@ -45,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/integration/util" "k8s.io/kubernetes/test/integration/util"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
@ -60,6 +63,8 @@ const (
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard") var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
var runID = time.Now().Format(dateFormat)
func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration") gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
cfg := config.KubeSchedulerConfiguration{} cfg := config.KubeSchedulerConfiguration{}
@ -177,6 +182,10 @@ type DataItem struct {
Unit string `json:"unit"` Unit string `json:"unit"`
// Labels is the labels of the data item. // Labels is the labels of the data item.
Labels map[string]string `json:"labels,omitempty"` Labels map[string]string `json:"labels,omitempty"`
// progress contains number of scheduled pods over time.
progress []podScheduling
start time.Time
} }
// DataItems is the data point set. It is the struct that perf dashboard expects. // DataItems is the data point set. It is the struct that perf dashboard expects.
@ -185,6 +194,14 @@ type DataItems struct {
DataItems []DataItem `json:"dataItems"` DataItems []DataItem `json:"dataItems"`
} }
type podScheduling struct {
ts time.Time
attempts int
completed int
observedTotal int
observedRate float64
}
// makeBasePod creates a Pod object to be used as a template. // makeBasePod creates a Pod object to be used as a template.
func makeBasePod() *v1.Pod { func makeBasePod() *v1.Pod {
basePod := &v1.Pod{ basePod := &v1.Pod{
@ -238,6 +255,17 @@ func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
return os.WriteFile(destFile, formatted.Bytes(), 0644) return os.WriteFile(destFile, formatted.Bytes(), 0644)
} }
func dataFilename(destFile string) (string, error) {
if *dataItemsDir != "" {
// Ensure the "dataItemsDir" path is valid.
if err := os.MkdirAll(*dataItemsDir, 0750); err != nil {
return "", fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %w", *dataItemsDir, err)
}
destFile = path.Join(*dataItemsDir, destFile)
}
return destFile, nil
}
type labelValues struct { type labelValues struct {
label string label string
values []string values []string
@ -378,15 +406,18 @@ type throughputCollector struct {
podInformer coreinformers.PodInformer podInformer coreinformers.PodInformer
schedulingThroughputs []float64 schedulingThroughputs []float64
labels map[string]string labels map[string]string
namespaces []string namespaces sets.Set[string]
errorMargin float64 errorMargin float64
progress []podScheduling
start time.Time
} }
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
return &throughputCollector{ return &throughputCollector{
podInformer: podInformer, podInformer: podInformer,
labels: labels, labels: labels,
namespaces: namespaces, namespaces: sets.New(namespaces...),
errorMargin: errorMargin, errorMargin: errorMargin,
} }
} }
@ -396,11 +427,75 @@ func (tc *throughputCollector) init() error {
} }
func (tc *throughputCollector) run(tCtx ktesting.TContext) { func (tc *throughputCollector) run(tCtx ktesting.TContext) {
podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...) // The collector is based on informer cache events instead of periodically listing pods because:
if err != nil { // - polling causes more overhead
klog.Fatalf("%v", err) // - it does not work when pods get created, scheduled and deleted quickly
//
// Normally, informers cannot be used to observe state changes reliably.
// They only guarantee that the *some* updates get reported, but not *all*.
// But in scheduler_perf, the scheduler and the test share the same informer,
// therefore we are guaranteed to see a new pod without NodeName (because
// that is what the scheduler needs to see to schedule it) and then the updated
// pod with NodeName (because nothing makes further changes to it).
var mutex sync.Mutex
scheduledPods := 0
getScheduledPods := func() int {
mutex.Lock()
defer mutex.Unlock()
return scheduledPods
} }
lastScheduledCount := len(podsScheduled) onPodChange := func(oldObj, newObj any) {
oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
if err != nil {
tCtx.Errorf("unexpected pod events: %v", err)
return
}
if !tc.namespaces.Has(newPod.Namespace) {
return
}
mutex.Lock()
defer mutex.Unlock()
if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" {
// Got scheduled.
scheduledPods++
}
}
handle, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
onPodChange(nil, obj)
},
UpdateFunc: func(oldObj, newObj any) {
onPodChange(oldObj, newObj)
},
})
if err != nil {
tCtx.Fatalf("register pod event handler: %v", err)
}
defer func() {
tCtx.ExpectNoError(tc.podInformer.Informer().RemoveEventHandler(handle), "remove event handler")
}()
// Waiting for the initial sync didn't work, `handle.HasSynced` always returned
// false - perhaps because the event handlers get added to a running informer.
// That's okay(ish), throughput is typically measured within an empty namespace.
//
// syncTicker := time.NewTicker(time.Millisecond)
// defer syncTicker.Stop()
// for {
// select {
// case <-syncTicker.C:
// if handle.HasSynced() {
// break
// }
// case <-tCtx.Done():
// return
// }
// }
tCtx.Logf("Started pod throughput collector for namespace(s) %s, %d pods scheduled so far", sets.List(tc.namespaces), getScheduledPods())
lastScheduledCount := getScheduledPods()
ticker := time.NewTicker(throughputSampleInterval) ticker := time.NewTicker(throughputSampleInterval)
defer ticker.Stop() defer ticker.Stop()
lastSampleTime := time.Now() lastSampleTime := time.Now()
@ -413,12 +508,8 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
return return
case <-ticker.C: case <-ticker.C:
now := time.Now() now := time.Now()
podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
if err != nil {
klog.Fatalf("%v", err)
}
scheduled := len(podsScheduled) scheduled := getScheduledPods()
// Only do sampling if number of scheduled pods is greater than zero. // Only do sampling if number of scheduled pods is greater than zero.
if scheduled == 0 { if scheduled == 0 {
continue continue
@ -429,6 +520,7 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
// sampling and creating pods get started independently. // sampling and creating pods get started independently.
lastScheduledCount = scheduled lastScheduledCount = scheduled
lastSampleTime = now lastSampleTime = now
tc.start = now
continue continue
} }
@ -461,6 +553,20 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
for i := 0; i <= skipped; i++ { for i := 0; i <= skipped; i++ {
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput) tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
} }
// Record the metric sample.
counters, err := testutil.GetCounterValuesFromGatherer(legacyregistry.DefaultGatherer, "scheduler_schedule_attempts_total", map[string]string{"profile": "default-scheduler"}, "result")
if err != nil {
klog.Error(err)
}
tc.progress = append(tc.progress, podScheduling{
ts: now,
attempts: int(counters["unschedulable"] + counters["error"] + counters["scheduled"]),
completed: int(counters["scheduled"]),
observedTotal: scheduled,
observedRate: throughput,
})
lastScheduledCount = scheduled lastScheduledCount = scheduled
klog.Infof("%d pods have been scheduled successfully", lastScheduledCount) klog.Infof("%d pods have been scheduled successfully", lastScheduledCount)
skipped = 0 skipped = 0
@ -470,7 +576,11 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
} }
func (tc *throughputCollector) collect() []DataItem { func (tc *throughputCollector) collect() []DataItem {
throughputSummary := DataItem{Labels: tc.labels} throughputSummary := DataItem{
Labels: tc.labels,
progress: tc.progress,
start: tc.start,
}
if length := len(tc.schedulingThroughputs); length > 0 { if length := len(tc.schedulingThroughputs); length > 0 {
sort.Float64s(tc.schedulingThroughputs) sort.Float64s(tc.schedulingThroughputs)
sum := 0.0 sum := 0.0