mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-01 01:08:55 +00:00
scheduler_perf + DRA: measure pod scheduling at a steady state
The previous tests were based on scheduling pods until the cluster was full. This is a valid scenario, but not necessarily realistic. More realistic is how quickly the scheduler can schedule new pods when some old pods finished running, in particular in a cluster that is properly utilized (= almost full). To test this, pods must get created, scheduled, and then immediately deleted. This can run for a certain period of time. Scenarios with empty and full cluster have different scheduling rates. This was previously visible for DRA because the 50% percentile of the scheduling throughput was lower than the average, but one had to guess in which scenario the throughput was lower. Now this can be measured for DRA with the new SteadyStateClusterResourceClaimTemplateStructured test. The metrics collector must watch pod events to figure out how many pods got scheduled. Polling misses pods that already got deleted again. There seems to be no relevant difference in the collected metrics (SchedulingWithResourceClaimTemplateStructured/2000pods_200nodes, 6 repetitions): │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ 157.1 ± 0% 157.1 ± 0% ~ (p=0.329 n=6) │ before │ after │ │ SchedulingThroughput/Perc50 │ SchedulingThroughput/Perc50 vs base │ 48.99 ± 8% 47.52 ± 9% ~ (p=0.937 n=6) │ before │ after │ │ SchedulingThroughput/Perc90 │ SchedulingThroughput/Perc90 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6) │ before │ after │ │ SchedulingThroughput/Perc95 │ SchedulingThroughput/Perc95 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6) │ before │ after │ │ SchedulingThroughput/Perc99 │ SchedulingThroughput/Perc99 vs base │ 463.9 ± 16% 460.1 ± 13% ~ (p=0.818 n=6)
This commit is contained in:
@@ -1167,7 +1167,9 @@
|
||||
maxClaimsPerNode: 20
|
||||
|
||||
# SchedulingWithResourceClaimTemplateStructured uses a ResourceClaimTemplate
|
||||
# and dynamically creates ResourceClaim instances for each pod.
|
||||
# and dynamically creates ResourceClaim instances for each pod. Node, pod and
|
||||
# device counts are chosen so that the cluster gets filled up completely.
|
||||
#
|
||||
# The driver uses structured parameters.
|
||||
- name: SchedulingWithResourceClaimTemplateStructured
|
||||
featureGates:
|
||||
@@ -1234,6 +1236,104 @@
|
||||
measurePods: 2500
|
||||
maxClaimsPerNode: 10
|
||||
|
||||
# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate
|
||||
# and dynamically creates ResourceClaim instances for each pod, but never
|
||||
# more than 10 at a time. Then it waits for a pod to get scheduled
|
||||
# before deleting it and creating another one.
|
||||
#
|
||||
# The workload determines whether there are other pods in the cluster.
|
||||
#
|
||||
# The driver uses structured parameters.
|
||||
- name: SteadyStateClusterResourceClaimTemplateStructured
|
||||
featureGates:
|
||||
DynamicResourceAllocation: true
|
||||
# SchedulerQueueingHints: true
|
||||
workloadTemplate:
|
||||
- opcode: createNodes
|
||||
countParam: $nodesWithoutDRA
|
||||
- opcode: createNodes
|
||||
nodeTemplatePath: config/dra/node-with-dra-test-driver.yaml
|
||||
countParam: $nodesWithDRA
|
||||
- opcode: createResourceDriver
|
||||
driverName: test-driver.cdi.k8s.io
|
||||
nodes: scheduler-perf-dra-*
|
||||
maxClaimsPerNodeParam: $maxClaimsPerNode
|
||||
structuredParameters: true
|
||||
- opcode: createAny
|
||||
templatePath: config/dra/deviceclass-structured.yaml
|
||||
- opcode: createAny
|
||||
templatePath: config/dra/resourceclaimtemplate-structured.yaml
|
||||
namespace: init
|
||||
- opcode: createPods
|
||||
namespace: init
|
||||
countParam: $initPods
|
||||
podTemplatePath: config/dra/pod-with-claim-template.yaml
|
||||
- opcode: createAny
|
||||
templatePath: config/dra/resourceclaimtemplate-structured.yaml
|
||||
namespace: test
|
||||
- opcode: createPods
|
||||
namespace: test
|
||||
count: 10
|
||||
steadyState: true
|
||||
durationParam: $duration
|
||||
podTemplatePath: config/dra/pod-with-claim-template.yaml
|
||||
collectMetrics: true
|
||||
workloads:
|
||||
- name: fast
|
||||
labels: [integration-test, fast, short]
|
||||
params:
|
||||
# This testcase runs through all code paths without
|
||||
# taking too long overall.
|
||||
nodesWithDRA: 1
|
||||
nodesWithoutDRA: 1
|
||||
initPods: 0
|
||||
maxClaimsPerNode: 10
|
||||
duration: 2s
|
||||
- name: empty_100nodes
|
||||
params:
|
||||
nodesWithDRA: 100
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 0
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
- name: empty_200nodes
|
||||
params:
|
||||
nodesWithDRA: 200
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 0
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
- name: empty_500nodes
|
||||
params:
|
||||
nodesWithDRA: 500
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 0
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
# In the "full" scenarios, the cluster can accommodate exactly one additional pod.
|
||||
# These are slower because scheduling the initial pods takes time.
|
||||
- name: full_100nodes
|
||||
params:
|
||||
nodesWithDRA: 100
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 199
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
- name: full_200nodes
|
||||
params:
|
||||
nodesWithDRA: 200
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 399
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
- name: full_500nodes
|
||||
params:
|
||||
nodesWithDRA: 500
|
||||
nodesWithoutDRA: 0
|
||||
initPods: 999
|
||||
maxClaimsPerNode: 2
|
||||
duration: 10s
|
||||
|
||||
# SchedulingWithResourceClaimTemplate uses ResourceClaims
|
||||
# with deterministic names that are shared between pods.
|
||||
# There is a fixed ratio of 1:5 between claims and pods.
|
||||
|
@@ -49,6 +49,7 @@ import (
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
@@ -63,10 +64,12 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/kubernetes/test/utils/ktesting/initoption"
|
||||
"k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
@@ -341,7 +344,7 @@ func (ms thresholdMetricSelector) isValid(mcc *metricsCollectorConfig) error {
|
||||
}
|
||||
|
||||
type params struct {
|
||||
params map[string]int
|
||||
params map[string]any
|
||||
// isUsed field records whether params is used or not.
|
||||
isUsed map[string]bool
|
||||
}
|
||||
@@ -358,14 +361,14 @@ type params struct {
|
||||
// to:
|
||||
//
|
||||
// params{
|
||||
// params: map[string]int{
|
||||
// params: map[string]any{
|
||||
// "intNodes": 500,
|
||||
// "initPods": 50,
|
||||
// },
|
||||
// isUsed: map[string]bool{}, // empty map
|
||||
// }
|
||||
func (p *params) UnmarshalJSON(b []byte) error {
|
||||
aux := map[string]int{}
|
||||
aux := map[string]any{}
|
||||
|
||||
if err := json.Unmarshal(b, &aux); err != nil {
|
||||
return err
|
||||
@@ -376,14 +379,31 @@ func (p *params) UnmarshalJSON(b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// get returns param.
|
||||
// get retrieves the parameter as an integer
|
||||
func (p params) get(key string) (int, error) {
|
||||
// JSON unmarshals integer constants in an "any" field as float.
|
||||
f, err := getParam[float64](p, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int(f), nil
|
||||
}
|
||||
|
||||
// getParam retrieves the parameter as specific type. There is no conversion,
|
||||
// so in practice this means that only types that JSON unmarshaling uses
|
||||
// (float64, string, bool) work.
|
||||
func getParam[T float64 | string | bool](p params, key string) (T, error) {
|
||||
p.isUsed[key] = true
|
||||
param, ok := p.params[key]
|
||||
if ok {
|
||||
return param, nil
|
||||
var t T
|
||||
if !ok {
|
||||
return t, fmt.Errorf("parameter %s is undefined", key)
|
||||
}
|
||||
return 0, fmt.Errorf("parameter %s is undefined", key)
|
||||
t, ok = param.(T)
|
||||
if !ok {
|
||||
return t, fmt.Errorf("parameter %s has the wrong type %T", key, param)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// unusedParams returns the names of unusedParams
|
||||
@@ -576,6 +596,27 @@ type createPodsOp struct {
|
||||
Count int
|
||||
// Template parameter for Count.
|
||||
CountParam string
|
||||
// If false, Count pods get created rapidly. This can be used to
|
||||
// measure how quickly the scheduler can fill up a cluster.
|
||||
//
|
||||
// If true, Count pods get created, the operation waits for
|
||||
// a pod to get scheduled, deletes it and then creates another.
|
||||
// This continues until the configured Duration is over.
|
||||
// Metrics collection, if enabled, runs in parallel.
|
||||
//
|
||||
// This mode can be used to measure how the scheduler behaves
|
||||
// in a steady state where the cluster is always at roughly the
|
||||
// same level of utilization. Pods can be created in a separate,
|
||||
// earlier operation to simulate non-empty clusters.
|
||||
//
|
||||
// Note that the operation will delete any scheduled pod in
|
||||
// the namespace, so use different namespaces for pods that
|
||||
// are supposed to be kept running.
|
||||
SteadyState bool
|
||||
// How long to keep the cluster in a steady state.
|
||||
Duration metav1.Duration
|
||||
// Template parameter for Duration.
|
||||
DurationParam string
|
||||
// Whether or not to enable metrics collection for this createPodsOp.
|
||||
// Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at
|
||||
// the same time for a particular createPodsOp.
|
||||
@@ -608,6 +649,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error {
|
||||
// use-cases right now.
|
||||
return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
|
||||
}
|
||||
if cpo.SkipWaitToCompletion && cpo.SteadyState {
|
||||
return errors.New("skipWaitToCompletion and steadyState cannot be true at the same time")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -623,6 +667,15 @@ func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if cpo.DurationParam != "" {
|
||||
durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cpo.Duration.Duration, err = time.ParseDuration(durationStr); err != nil {
|
||||
return nil, fmt.Errorf("parsing duration parameter %s: %w", cpo.DurationParam, err)
|
||||
}
|
||||
}
|
||||
return &cpo, (&cpo).isValid(false)
|
||||
}
|
||||
|
||||
@@ -1298,14 +1351,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
|
||||
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace})
|
||||
defer collectorCtx.Cancel("cleaning up")
|
||||
}
|
||||
if err := createPods(tCtx, namespace, concreteOp); err != nil {
|
||||
if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
if concreteOp.SkipWaitToCompletion {
|
||||
switch {
|
||||
case concreteOp.SkipWaitToCompletion:
|
||||
// Only record those namespaces that may potentially require barriers
|
||||
// in the future.
|
||||
numPodsScheduledPerNamespace[namespace] += concreteOp.Count
|
||||
} else {
|
||||
case concreteOp.SteadyState:
|
||||
if err := createPodsSteadily(tCtx, namespace, podInformer, concreteOp); err != nil {
|
||||
tCtx.Fatalf("op %d: %v", opIndex, err)
|
||||
}
|
||||
default:
|
||||
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
|
||||
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
|
||||
}
|
||||
@@ -1588,7 +1646,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte
|
||||
), nil
|
||||
}
|
||||
|
||||
func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
|
||||
// createPodsRapidly implements the "create pods rapidly" mode of [createPodsOp].
|
||||
// It's a nop when cpo.SteadyState is true.
|
||||
func createPodsRapidly(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
|
||||
if cpo.SteadyState {
|
||||
return nil
|
||||
}
|
||||
strategy, err := getPodStrategy(cpo)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1600,6 +1663,147 @@ func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) err
|
||||
return podCreator.CreatePods(tCtx)
|
||||
}
|
||||
|
||||
// createPodsSteadily implements the "create pods and delete pods" mode of [createPodsOp].
|
||||
// It's a nop when cpo.SteadyState is false.
|
||||
func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer coreinformers.PodInformer, cpo *createPodsOp) error {
|
||||
if !cpo.SteadyState {
|
||||
return nil
|
||||
}
|
||||
strategy, err := getPodStrategy(cpo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tCtx.Logf("creating pods in namespace %q for %s", namespace, cpo.Duration)
|
||||
tCtx = ktesting.WithTimeout(tCtx, cpo.Duration.Duration, fmt.Sprintf("the operation ran for the configured %s", cpo.Duration.Duration))
|
||||
|
||||
// Start watching pods in the namespace. Any pod which is seen as being scheduled
|
||||
// gets deleted.
|
||||
scheduledPods := make(chan *v1.Pod, cpo.Count)
|
||||
scheduledPodsClosed := false
|
||||
var mutex sync.Mutex
|
||||
defer func() {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
close(scheduledPods)
|
||||
scheduledPodsClosed = true
|
||||
}()
|
||||
|
||||
existingPods := 0
|
||||
runningPods := 0
|
||||
onPodChange := func(oldObj, newObj any) {
|
||||
oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
|
||||
if err != nil {
|
||||
tCtx.Errorf("unexpected pod events: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
if oldPod == nil {
|
||||
existingPods++
|
||||
}
|
||||
if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" {
|
||||
// Got scheduled.
|
||||
runningPods++
|
||||
|
||||
// Only ask for deletion in our namespace.
|
||||
if newPod.Namespace != namespace {
|
||||
return
|
||||
}
|
||||
if !scheduledPodsClosed {
|
||||
select {
|
||||
case <-tCtx.Done():
|
||||
case scheduledPods <- newPod:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
handle, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
onPodChange(nil, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj any) {
|
||||
onPodChange(oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: func(obj any) {
|
||||
pod, _, err := schedutil.As[*v1.Pod](obj, nil)
|
||||
if err != nil {
|
||||
tCtx.Errorf("unexpected pod events: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
existingPods--
|
||||
if pod.Spec.NodeName != "" {
|
||||
runningPods--
|
||||
}
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("register event handler: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
tCtx.ExpectNoError(podInformer.Informer().RemoveEventHandler(handle), "remove event handler")
|
||||
}()
|
||||
|
||||
// Seed the namespace with the initial number of pods.
|
||||
if err := strategy(tCtx, tCtx.Client(), namespace, cpo.Count); err != nil {
|
||||
return fmt.Errorf("create initial %d pods: %w", cpo.Count, err)
|
||||
}
|
||||
|
||||
// Now loop until we are done. Report periodically how many pods were scheduled.
|
||||
countScheduledPods := 0
|
||||
lastCountScheduledPods := 0
|
||||
logPeriod := time.Second
|
||||
ticker := time.NewTicker(logPeriod)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tCtx.Done():
|
||||
tCtx.Logf("Completed after seeing %d scheduled pod: %v", countScheduledPods, context.Cause(tCtx))
|
||||
return nil
|
||||
case <-scheduledPods:
|
||||
countScheduledPods++
|
||||
if countScheduledPods%cpo.Count == 0 {
|
||||
// All scheduled. Start over with a new batch.
|
||||
err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, metav1.DeleteOptions{
|
||||
GracePeriodSeconds: ptr.To(int64(0)),
|
||||
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), // Foreground will block.
|
||||
}, metav1.ListOptions{})
|
||||
// Ignore errors when the time is up. errors.Is(context.Canceled) would
|
||||
// be more precise, but doesn't work because client-go doesn't reliably
|
||||
// propagate it. Instead, this was seen:
|
||||
// client rate limiter Wait returned an error: rate: Wait(n=1) would exceed context deadline
|
||||
if tCtx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete scheduled pods: %w", err)
|
||||
}
|
||||
err = strategy(tCtx, tCtx.Client(), namespace, cpo.Count)
|
||||
if tCtx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("create next batch of pods: %w", err)
|
||||
}
|
||||
}
|
||||
case <-ticker.C:
|
||||
delta := countScheduledPods - lastCountScheduledPods
|
||||
lastCountScheduledPods = countScheduledPods
|
||||
func() {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
|
||||
tCtx.Logf("%d pods got scheduled in total in namespace %q, overall %d out of %d pods scheduled: %f pods/s in last interval",
|
||||
countScheduledPods, namespace,
|
||||
runningPods, existingPods,
|
||||
float64(delta)/logPeriod.Seconds(),
|
||||
)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// waitUntilPodsScheduledInNamespace blocks until all pods in the given
|
||||
// namespace are scheduled. Times out after 10 minutes because even at the
|
||||
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
|
||||
|
@@ -26,6 +26,7 @@ import (
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@@ -35,6 +36,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
@@ -45,6 +47,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
@@ -378,7 +381,7 @@ type throughputCollector struct {
|
||||
podInformer coreinformers.PodInformer
|
||||
schedulingThroughputs []float64
|
||||
labels map[string]string
|
||||
namespaces []string
|
||||
namespaces sets.Set[string]
|
||||
errorMargin float64
|
||||
}
|
||||
|
||||
@@ -386,7 +389,7 @@ func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[st
|
||||
return &throughputCollector{
|
||||
podInformer: podInformer,
|
||||
labels: labels,
|
||||
namespaces: namespaces,
|
||||
namespaces: sets.New(namespaces...),
|
||||
errorMargin: errorMargin,
|
||||
}
|
||||
}
|
||||
@@ -396,11 +399,75 @@ func (tc *throughputCollector) init() error {
|
||||
}
|
||||
|
||||
func (tc *throughputCollector) run(tCtx ktesting.TContext) {
|
||||
podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
// The collector is based on informer cache events instead of periodically listing pods because:
|
||||
// - polling causes more overhead
|
||||
// - it does not work when pods get created, scheduled and deleted quickly
|
||||
//
|
||||
// Normally, informers cannot be used to observe state changes reliably.
|
||||
// They only guarantee that the *some* updates get reported, but not *all*.
|
||||
// But in scheduler_perf, the scheduler and the test share the same informer,
|
||||
// therefore we are guaranteed to see a new pod without NodeName (because
|
||||
// that is what the scheduler needs to see to schedule it) and then the updated
|
||||
// pod with NodeName (because nothing makes further changes to it).
|
||||
var mutex sync.Mutex
|
||||
scheduledPods := 0
|
||||
getScheduledPods := func() int {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return scheduledPods
|
||||
}
|
||||
lastScheduledCount := len(podsScheduled)
|
||||
onPodChange := func(oldObj, newObj any) {
|
||||
oldPod, newPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
|
||||
if err != nil {
|
||||
tCtx.Errorf("unexpected pod events: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !tc.namespaces.Has(newPod.Namespace) {
|
||||
return
|
||||
}
|
||||
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
if (oldPod == nil || oldPod.Spec.NodeName == "") && newPod.Spec.NodeName != "" {
|
||||
// Got scheduled.
|
||||
scheduledPods++
|
||||
}
|
||||
}
|
||||
handle, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
onPodChange(nil, obj)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj any) {
|
||||
onPodChange(oldObj, newObj)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
tCtx.Fatalf("register pod event handler: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
tCtx.ExpectNoError(tc.podInformer.Informer().RemoveEventHandler(handle), "remove event handler")
|
||||
}()
|
||||
|
||||
// Waiting for the initial sync didn't work, `handle.HasSynced` always returned
|
||||
// false - perhaps because the event handlers get added to a running informer.
|
||||
// That's okay(ish), throughput is typically measured within an empty namespace.
|
||||
//
|
||||
// syncTicker := time.NewTicker(time.Millisecond)
|
||||
// defer syncTicker.Stop()
|
||||
// for {
|
||||
// select {
|
||||
// case <-syncTicker.C:
|
||||
// if handle.HasSynced() {
|
||||
// break
|
||||
// }
|
||||
// case <-tCtx.Done():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
tCtx.Logf("Started pod throughput collector for namespace(s) %s, %d pods scheduled so far", sets.List(tc.namespaces), getScheduledPods())
|
||||
|
||||
lastScheduledCount := getScheduledPods()
|
||||
ticker := time.NewTicker(throughputSampleInterval)
|
||||
defer ticker.Stop()
|
||||
lastSampleTime := time.Now()
|
||||
@@ -413,12 +480,8 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
scheduled := len(podsScheduled)
|
||||
scheduled := getScheduledPods()
|
||||
// Only do sampling if number of scheduled pods is greater than zero.
|
||||
if scheduled == 0 {
|
||||
continue
|
||||
|
Reference in New Issue
Block a user