scheduler_perf + DRA: load up cluster by allocating claims

Having to schedule 4999 pods to simulate a "full" cluster is slow. Creating
claims and then allocating them more or less like the scheduler would when
scheduling pods is much faster and in practice has the same effect on the
dynamicresources plugin because it looks at claims, not pods.

This allows defining the "steady state" workloads with higher number of
devices ("claimsPerNode") again. This was prohibitively slow before.
This commit is contained in:
Patrick Ohly 2024-09-13 09:51:27 +02:00
parent 385599f0a8
commit ded96042f7
3 changed files with 162 additions and 24 deletions

View File

@ -1236,12 +1236,13 @@
measurePods: 2500
maxClaimsPerNode: 10
# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate
# and dynamically creates ResourceClaim instances for each pod, but never
# more than 10 at a time. Then it waits for a pod to get scheduled
# before deleting it and creating another one.
# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate and
# dynamically creates ResourceClaim instances for each pod. It creates ten
# pods, waits for them to be scheduled, deletes them, and starts again,
# so the cluster remains at the same level of utilization.
#
# The workload determines whether there are other pods in the cluster.
# The number of already allocated claims can be varied, thus simulating
# various degrees of pre-existing resource utilization.
#
# The driver uses structured parameters.
- name: SteadyStateClusterResourceClaimTemplateStructured
@ -1262,12 +1263,11 @@
- opcode: createAny
templatePath: config/dra/deviceclass-structured.yaml
- opcode: createAny
templatePath: config/dra/resourceclaimtemplate-structured.yaml
templatePath: config/dra/resourceclaim-structured.yaml
countParam: $initClaims
namespace: init
- opcode: createPods
- opcode: allocResourceClaims
namespace: init
countParam: $initPods
podTemplatePath: config/dra/pod-with-claim-template.yaml
- opcode: createAny
templatePath: config/dra/resourceclaimtemplate-structured.yaml
namespace: test
@ -1286,52 +1286,73 @@
# taking too long overall.
nodesWithDRA: 1
nodesWithoutDRA: 1
initPods: 0
initClaims: 0
maxClaimsPerNode: 10
duration: 2s
- name: empty_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initPods: 0
maxClaimsPerNode: 2
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
- name: empty_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initPods: 0
maxClaimsPerNode: 2
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
- name: empty_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initPods: 0
maxClaimsPerNode: 2
initClaims: 0
maxClaimsPerNode: 10
duration: 10s
# In the "full" scenarios, the cluster can accommodate exactly one additional pod.
# These are slower because scheduling the initial pods takes time.
# In the "half" scenarios, half of the devices are in use.
- name: half_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initClaims: 500
maxClaimsPerNode: 10
duration: 10s
- name: half_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initClaims: 1000
maxClaimsPerNode: 10
duration: 10s
- name: half_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initClaims: 2500
maxClaimsPerNode: 10
duration: 10s
# In the "full" scenarios, the cluster can accommodate exactly 10 additional pods.
- name: full_100nodes
params:
nodesWithDRA: 100
nodesWithoutDRA: 0
initPods: 199
maxClaimsPerNode: 2
initClaims: 990
maxClaimsPerNode: 10
duration: 10s
- name: full_200nodes
params:
nodesWithDRA: 200
nodesWithoutDRA: 0
initPods: 399
maxClaimsPerNode: 2
initClaims: 1990
maxClaimsPerNode: 10
duration: 10s
- name: full_500nodes
params:
nodesWithDRA: 500
nodesWithoutDRA: 0
initPods: 999
maxClaimsPerNode: 2
initClaims: 4990
maxClaimsPerNode: 10
duration: 10s
# SchedulingWithResourceClaimTemplate uses ResourceClaims

View File

@ -19,14 +19,23 @@ package benchmark
import (
"context"
"fmt"
"math/rand/v2"
"path/filepath"
"reflect"
"sync"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
@ -261,3 +270,109 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou
return slice
}
// allocResourceClaimsOp defines an op where resource claims with structured
// parameters get allocated without being associated with a pod.
type allocResourceClaimsOp struct {
// Must be allocResourceClaimsOpcode.
Opcode operationCode
// Namespace where claims are to be allocated, all namespaces if empty.
Namespace string
}
var _ realOp = &allocResourceClaimsOp{}
var _ runnableOp = &allocResourceClaimsOp{}
func (op *allocResourceClaimsOp) isValid(allowParameterization bool) error {
return nil
}
func (op *allocResourceClaimsOp) collectsMetrics() bool {
return false
}
func (op *allocResourceClaimsOp) patchParams(w *workload) (realOp, error) {
return op, op.isValid(false)
}
func (op *allocResourceClaimsOp) requiredNamespaces() []string { return nil }
func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
claims, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(op.Namespace).List(tCtx, metav1.ListOptions{})
tCtx.ExpectNoError(err, "list claims")
tCtx.Logf("allocating %d ResourceClaims", len(claims.Items))
tCtx = ktesting.WithCancel(tCtx)
defer tCtx.Cancel("allocResourceClaimsOp.run is done")
// Track cluster state.
informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0)
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister()
sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil)
claimLister := claimLister{cache: claimCache}
informerFactory.Start(tCtx.Done())
defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
informerFactory.Shutdown()
}()
syncedInformers := informerFactory.WaitForCacheSync(tCtx.Done())
expectSyncedInformers := map[reflect.Type]bool{
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&v1.Node{}): true,
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes")
// Allocate one claim at a time, picking nodes randomly. Each
// allocation is stored immediately, using the claim cache to avoid
// having to wait for the actual informer update.
claims:
for i := range claims.Items {
claim := &claims.Items[i]
if claim.Status.Allocation != nil {
continue
}
allocator, err := structured.NewAllocator(tCtx, []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
for _, node := range nodes {
result, err := allocator.Allocate(tCtx, node)
tCtx.ExpectNoError(err, "allocate claim")
if result != nil {
claim = claim.DeepCopy()
claim.Status.Allocation = result[0]
claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update claim status with allocation")
tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim")
continue claims
}
}
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items))
}
}
type claimLister struct {
cache *assumecache.AssumeCache
}
func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
objs := c.cache.List(nil)
allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation != nil {
allocatedClaims = append(allocatedClaims, claim)
}
}
return allocatedClaims, nil
}

View File

@ -76,6 +76,7 @@ import (
type operationCode string
const (
allocResourceClaimsOpcode operationCode = "allocResourceClaims"
createAnyOpcode operationCode = "createAny"
createNodesOpcode operationCode = "createNodes"
createNamespacesOpcode operationCode = "createNamespaces"
@ -426,6 +427,7 @@ type op struct {
// which op we're decoding at runtime.
func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := map[operationCode]realOp{
allocResourceClaimsOpcode: &allocResourceClaimsOp{},
createAnyOpcode: &createAny{},
createNodesOpcode: &createNodesOp{},
createNamespacesOpcode: &createNamespacesOp{},