diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 8f29ab49599..c01f32bf717 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1236,12 +1236,13 @@ measurePods: 2500 maxClaimsPerNode: 10 -# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate -# and dynamically creates ResourceClaim instances for each pod, but never -# more than 10 at a time. Then it waits for a pod to get scheduled -# before deleting it and creating another one. +# SteadyStateResourceClaimTemplateStructured uses a ResourceClaimTemplate and +# dynamically creates ResourceClaim instances for each pod. It creates ten +# pods, waits for them to be scheduled, deletes them, and starts again, +# so the cluster remains at the same level of utilization. # -# The workload determines whether there are other pods in the cluster. +# The number of already allocated claims can be varied, thus simulating +# various degrees of pre-existing resource utilization. # # The driver uses structured parameters. - name: SteadyStateClusterResourceClaimTemplateStructured @@ -1262,12 +1263,11 @@ - opcode: createAny templatePath: config/dra/deviceclass-structured.yaml - opcode: createAny - templatePath: config/dra/resourceclaimtemplate-structured.yaml + templatePath: config/dra/resourceclaim-structured.yaml + countParam: $initClaims namespace: init - - opcode: createPods + - opcode: allocResourceClaims namespace: init - countParam: $initPods - podTemplatePath: config/dra/pod-with-claim-template.yaml - opcode: createAny templatePath: config/dra/resourceclaimtemplate-structured.yaml namespace: test @@ -1286,52 +1286,73 @@ # taking too long overall. nodesWithDRA: 1 nodesWithoutDRA: 1 - initPods: 0 + initClaims: 0 maxClaimsPerNode: 10 duration: 2s - name: empty_100nodes params: nodesWithDRA: 100 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - name: empty_200nodes params: nodesWithDRA: 200 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - name: empty_500nodes params: nodesWithDRA: 500 nodesWithoutDRA: 0 - initPods: 0 - maxClaimsPerNode: 2 + initClaims: 0 + maxClaimsPerNode: 10 duration: 10s - # In the "full" scenarios, the cluster can accommodate exactly one additional pod. - # These are slower because scheduling the initial pods takes time. + # In the "half" scenarios, half of the devices are in use. + - name: half_100nodes + params: + nodesWithDRA: 100 + nodesWithoutDRA: 0 + initClaims: 500 + maxClaimsPerNode: 10 + duration: 10s + - name: half_200nodes + params: + nodesWithDRA: 200 + nodesWithoutDRA: 0 + initClaims: 1000 + maxClaimsPerNode: 10 + duration: 10s + - name: half_500nodes + params: + nodesWithDRA: 500 + nodesWithoutDRA: 0 + initClaims: 2500 + maxClaimsPerNode: 10 + duration: 10s + # In the "full" scenarios, the cluster can accommodate exactly 10 additional pods. - name: full_100nodes params: nodesWithDRA: 100 nodesWithoutDRA: 0 - initPods: 199 - maxClaimsPerNode: 2 + initClaims: 990 + maxClaimsPerNode: 10 duration: 10s - name: full_200nodes params: nodesWithDRA: 200 nodesWithoutDRA: 0 - initPods: 399 - maxClaimsPerNode: 2 + initClaims: 1990 + maxClaimsPerNode: 10 duration: 10s - name: full_500nodes params: nodesWithDRA: 500 nodesWithoutDRA: 0 - initPods: 999 - maxClaimsPerNode: 2 + initClaims: 4990 + maxClaimsPerNode: 10 duration: 10s # SchedulingWithResourceClaimTemplate uses ResourceClaims diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d4fa422c7f0..8bf0d93e9c6 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -19,14 +19,23 @@ package benchmark import ( "context" "fmt" + "math/rand/v2" "path/filepath" + "reflect" "sync" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" @@ -261,3 +270,109 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou return slice } + +// allocResourceClaimsOp defines an op where resource claims with structured +// parameters get allocated without being associated with a pod. +type allocResourceClaimsOp struct { + // Must be allocResourceClaimsOpcode. + Opcode operationCode + // Namespace where claims are to be allocated, all namespaces if empty. + Namespace string +} + +var _ realOp = &allocResourceClaimsOp{} +var _ runnableOp = &allocResourceClaimsOp{} + +func (op *allocResourceClaimsOp) isValid(allowParameterization bool) error { + return nil +} + +func (op *allocResourceClaimsOp) collectsMetrics() bool { + return false +} +func (op *allocResourceClaimsOp) patchParams(w *workload) (realOp, error) { + return op, op.isValid(false) +} + +func (op *allocResourceClaimsOp) requiredNamespaces() []string { return nil } + +func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { + claims, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(op.Namespace).List(tCtx, metav1.ListOptions{}) + tCtx.ExpectNoError(err, "list claims") + tCtx.Logf("allocating %d ResourceClaims", len(claims.Items)) + tCtx = ktesting.WithCancel(tCtx) + defer tCtx.Cancel("allocResourceClaimsOp.run is done") + + // Track cluster state. + informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0) + claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer() + classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister() + sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() + claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) + claimLister := claimLister{cache: claimCache} + informerFactory.Start(tCtx.Done()) + defer func() { + tCtx.Cancel("allocResourceClaimsOp.run is shutting down") + informerFactory.Shutdown() + }() + syncedInformers := informerFactory.WaitForCacheSync(tCtx.Done()) + expectSyncedInformers := map[reflect.Type]bool{ + reflect.TypeOf(&resourceapi.DeviceClass{}): true, + reflect.TypeOf(&resourceapi.ResourceClaim{}): true, + reflect.TypeOf(&resourceapi.ResourceSlice{}): true, + reflect.TypeOf(&v1.Node{}): true, + } + require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + + // The set of nodes is assumed to be fixed at this point. + nodes, err := nodeLister.List(labels.Everything()) + tCtx.ExpectNoError(err, "list nodes") + + // Allocate one claim at a time, picking nodes randomly. Each + // allocation is stored immediately, using the claim cache to avoid + // having to wait for the actual informer update. +claims: + for i := range claims.Items { + claim := &claims.Items[i] + if claim.Status.Allocation != nil { + continue + } + + allocator, err := structured.NewAllocator(tCtx, []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister) + tCtx.ExpectNoError(err, "create allocator") + + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) + for _, node := range nodes { + result, err := allocator.Allocate(tCtx, node) + tCtx.ExpectNoError(err, "allocate claim") + if result != nil { + claim = claim.DeepCopy() + claim.Status.Allocation = result[0] + claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) + tCtx.ExpectNoError(err, "update claim status with allocation") + tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") + continue claims + } + } + tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) + } +} + +type claimLister struct { + cache *assumecache.AssumeCache +} + +func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { + objs := c.cache.List(nil) + allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs)) + for _, obj := range objs { + claim := obj.(*resourceapi.ResourceClaim) + if claim.Status.Allocation != nil { + allocatedClaims = append(allocatedClaims, claim) + } + } + return allocatedClaims, nil +} diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 94da182036f..2f97f953385 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -76,6 +76,7 @@ import ( type operationCode string const ( + allocResourceClaimsOpcode operationCode = "allocResourceClaims" createAnyOpcode operationCode = "createAny" createNodesOpcode operationCode = "createNodes" createNamespacesOpcode operationCode = "createNamespaces" @@ -426,6 +427,7 @@ type op struct { // which op we're decoding at runtime. func (op *op) UnmarshalJSON(b []byte) error { possibleOps := map[operationCode]realOp{ + allocResourceClaimsOpcode: &allocResourceClaimsOp{}, createAnyOpcode: &createAny{}, createNodesOpcode: &createNodesOp{}, createNamespacesOpcode: &createNamespacesOp{},