From ad22b74c60940d0c2332884e37c8996ccbb0a165 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 9 Sep 2024 18:11:34 +0200 Subject: [PATCH 01/13] DRA scheduler: fix match attribute names in test FullyQualifiedNames must include a domain. The current code doesn't care, but once it does, the tests better should behave correctly. --- .../structured/allocator_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index f16c38c9cee..d90ad29db6f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -353,11 +353,11 @@ func sliceWithOneDevice(name string, nodeSelection any, pool, driver string) *re } func TestAllocator(t *testing.T) { - nonExistentAttribute := resourceapi.FullyQualifiedName("NonExistentAttribute") - boolAttribute := resourceapi.FullyQualifiedName("boolAttribute") - stringAttribute := resourceapi.FullyQualifiedName("stringAttribute") - versionAttribute := resourceapi.FullyQualifiedName("driverVersion") - intAttribute := resourceapi.FullyQualifiedName("numa") + nonExistentAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "NonExistentAttribute") + boolAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "boolAttribute") + stringAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "stringAttribute") + versionAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "driverVersion") + intAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "numa") testcases := map[string]struct { adminAccess bool From 7de6d070f202aaf5dd3ce10a8fdb85666c3cde62 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 3 Sep 2024 16:41:29 +0200 Subject: [PATCH 02/13] DRA scheduler: avoid listing claims during Filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Allocate call used to call back into the claim lister for each node. This was significant work which showed up at the top of the CPU profile. It's okay to list only once during PreFilter because the Filter call does not change the claim status between Allocate calls. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 15.04 ± 0% 18.06 ± 2% +20.07% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 105.5 ± 1% 104.7 ± 2% ~ (p=0.485 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 95.83 ± 1% 96.62 ± 1% ~ (p=0.063 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 79.67 ± 3% 83.00 ± 2% +4.18% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 27.11 ± 5% 32.45 ± 7% +19.68% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 84.00 ± 3% 95.22 ± 7% +13.36% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 7.110 ± 6% 9.111 ± 10% +28.15% (p=0.002 n=6) geomean 41.05 45.86 +11.73% --- .../plugins/dynamicresources/dynamicresources.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 92842efc253..1f0aaa1f395 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -528,8 +528,14 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // persistently. // // Claims are treated as "allocated" if they are in the assume cache - // or currently their allocation is in-flight. - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}, pl.classLister, pl.sliceLister) + // or currently their allocation is in-flight. This does not change + // during filtering, so we can determine that once. + claimLister := &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations} + allocatedClaims, err := claimLister.ListAllAllocated() + if err != nil { + return nil, statusError(logger, err) + } + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, staticClaimLister(allocatedClaims), pl.classLister, pl.sliceLister) if err != nil { return nil, statusError(logger, err) } @@ -562,6 +568,12 @@ func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.Resource return allocated, nil } +type staticClaimLister []*resourceapi.ResourceClaim + +func (cl staticClaimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { + return cl, nil +} + // PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *DynamicResources) PreFilterExtensions() framework.PreFilterExtensions { return nil From 124689831531804eaaf754e8f72f14935dc33856 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 4 Sep 2024 14:34:35 +0200 Subject: [PATCH 03/13] DRA scheduler: ResourceSlice with unique strings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using unique strings instead of normal strings speeds up allocation with structured parameters because maps that use those strings as key no longer need to build hashes of the string content. However, care must be taken to call unique.Make as little as possible because it is costly. Pre-allocating the map of allocated devices reduces the need to grow the map when adding devices. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 18.06 ± 2% 33.30 ± 2% +84.31% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 104.7 ± 2% 105.3 ± 2% ~ (p=0.818 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 96.62 ± 1% 100.75 ± 1% +4.28% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 83.00 ± 2% 90.96 ± 2% +9.59% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 32.45 ± 7% 49.84 ± 4% +53.60% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 95.22 ± 7% 103.80 ± 1% +9.00% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 9.111 ± 10% 27.215 ± 7% +198.69% (p=0.002 n=6) geomean 45.86 64.26 +40.12% --- .../dynamicresources/dynamicresources.go | 49 +-- .../api/conversion.go | 39 +++ .../dynamic-resource-allocation/api/doc.go | 22 ++ .../dynamic-resource-allocation/api/types.go | 64 ++++ .../api/uniquestring.go | 41 +++ .../api/zz_generated.conversion.go | 298 ++++++++++++++++++ .../structured/allocator.go | 130 ++++---- .../structured/allocator_test.go | 81 +---- .../structured/pools.go | 60 ++-- test/integration/scheduler_perf/dra.go | 33 +- 10 files changed, 621 insertions(+), 196 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/api/doc.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/api/types.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 1f0aaa1f395..2a1bd028807 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -30,6 +30,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -45,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" + "k8s.io/utils/ptr" ) const ( @@ -530,12 +532,15 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - claimLister := &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations} - allocatedClaims, err := claimLister.ListAllAllocated() + allocatedDevices := pl.listAllAllocatedDevices() if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, staticClaimLister(allocatedClaims), pl.classLister, pl.sliceLister) + slices, err := pl.sliceLister.List(labels.Everything()) + if err != nil { + return nil, statusError(logger, err) + } + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices) if err != nil { return nil, statusError(logger, err) } @@ -547,31 +552,33 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, nil } -type claimListerForAssumeCache struct { - assumeCache *assumecache.AssumeCache - inFlightAllocations *sync.Map -} - -func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { +func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID { // Probably not worth adding an index for? - objs := cl.assumeCache.List(nil) - allocated := make([]*resourceapi.ResourceClaim, 0, len(objs)) + objs := pl.claimAssumeCache.List(nil) + var allocated []structured.DeviceID for _, obj := range objs { claim := obj.(*resourceapi.ResourceClaim) - if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok { + if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok { claim = obj.(*resourceapi.ResourceClaim) } - if claim.Status.Allocation != nil { - allocated = append(allocated, claim) + if claim.Status.Allocation == nil { + continue + } + for _, result := range claim.Status.Allocation.Devices.Results { + // Kubernetes 1.31 did not set this, 1.32 always does. + // Supporting 1.31 is not worth the additional code that + // would have to be written (= looking up in request) because + // it is extremely unlikely that there really is a result + // that still exists in a cluster from 1.31 where this matters. + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + allocated = append(allocated, deviceID) } } - return allocated, nil -} - -type staticClaimLister []*resourceapi.ResourceClaim - -func (cl staticClaimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - return cl, nil + return allocated } // PreFilterExtensions returns prefilter extensions, pod add and remove. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go new file mode 100644 index 00000000000..28186e6baca --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "unique" + + conversion "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + localSchemeBuilder runtime.SchemeBuilder + AddToScheme = localSchemeBuilder.AddToScheme +) + +func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error { + *out = in.String() + return nil +} + +func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error { + *out = UniqueString(unique.Make(*in)) + return nil +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go b/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go new file mode 100644 index 00000000000..9e28284ff3f --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package api contains a variant of the API where strings are unique. These +// unique strings are faster to compare and more efficient when used as key in +// a map. +// +// +k8s:conversion-gen=k8s.io/api/resource/v1alpha3 +package api diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/types.go b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go new file mode 100644 index 00000000000..3d4928323fd --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go @@ -0,0 +1,64 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ResourceSlice struct { + metav1.TypeMeta + metav1.ObjectMeta + Spec ResourceSliceSpec +} + +type ResourceSliceSpec struct { + Driver UniqueString + Pool ResourcePool + NodeName string + NodeSelector *v1.NodeSelector + AllNodes bool + Devices []Device +} + +type ResourcePool struct { + Name UniqueString + Generation int64 + ResourceSliceCount int64 +} +type Device struct { + Name UniqueString + Basic *BasicDevice +} + +type BasicDevice struct { + Attributes map[QualifiedName]DeviceAttribute + Capacity map[QualifiedName]resource.Quantity +} + +type QualifiedName string + +type FullyQualifiedName string + +type DeviceAttribute struct { + IntValue *int64 + BoolValue *bool + StringValue *string + VersionValue *string +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go b/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go new file mode 100644 index 00000000000..34d413743ac --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go @@ -0,0 +1,41 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "unique" +) + +// NullUniqueString is a UniqueString which contains no string. +var NullUniqueString UniqueString + +// UniqueString is a wrapper around [unique.Handle[string]]. +type UniqueString unique.Handle[string] + +// Returns the string that is stored in the UniqueString. +// If the UniqueString is null, the empty string is returned. +func (us UniqueString) String() string { + if us == NullUniqueString { + return "" + } + return unique.Handle[string](us).Value() +} + +// MakeUniqueString constructs a new unique string. +func MakeUniqueString(str string) UniqueString { + return UniqueString(unique.Make(str)) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go new file mode 100644 index 00000000000..e12091e987f --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go @@ -0,0 +1,298 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by conversion-gen. DO NOT EDIT. + +package api + +import ( + unsafe "unsafe" + + v1 "k8s.io/api/core/v1" + v1alpha3 "k8s.io/api/resource/v1alpha3" + resource "k8s.io/apimachinery/pkg/api/resource" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*BasicDevice)(nil), (*v1alpha3.BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_BasicDevice_To_v1alpha3_BasicDevice(a.(*BasicDevice), b.(*v1alpha3.BasicDevice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.BasicDevice)(nil), (*BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_BasicDevice_To_api_BasicDevice(a.(*v1alpha3.BasicDevice), b.(*BasicDevice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*Device)(nil), (*v1alpha3.Device)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_Device_To_v1alpha3_Device(a.(*Device), b.(*v1alpha3.Device), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.Device)(nil), (*Device)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_Device_To_api_Device(a.(*v1alpha3.Device), b.(*Device), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*DeviceAttribute)(nil), (*v1alpha3.DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(a.(*DeviceAttribute), b.(*v1alpha3.DeviceAttribute), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.DeviceAttribute)(nil), (*DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(a.(*v1alpha3.DeviceAttribute), b.(*DeviceAttribute), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourcePool)(nil), (*v1alpha3.ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourcePool_To_v1alpha3_ResourcePool(a.(*ResourcePool), b.(*v1alpha3.ResourcePool), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourcePool)(nil), (*ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourcePool_To_api_ResourcePool(a.(*v1alpha3.ResourcePool), b.(*ResourcePool), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourceSlice)(nil), (*v1alpha3.ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(a.(*ResourceSlice), b.(*v1alpha3.ResourceSlice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSlice)(nil), (*ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(a.(*v1alpha3.ResourceSlice), b.(*ResourceSlice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourceSliceSpec)(nil), (*v1alpha3.ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(a.(*ResourceSliceSpec), b.(*v1alpha3.ResourceSliceSpec), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSliceSpec)(nil), (*ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(a.(*v1alpha3.ResourceSliceSpec), b.(*ResourceSliceSpec), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*UniqueString)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_UniqueString_To_string(a.(*UniqueString), b.(*string), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*string)(nil), (*UniqueString)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_string_To_api_UniqueString(a.(*string), b.(*UniqueString), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error { + out.Attributes = *(*map[v1alpha3.QualifiedName]v1alpha3.DeviceAttribute)(unsafe.Pointer(&in.Attributes)) + out.Capacity = *(*map[v1alpha3.QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_api_BasicDevice_To_v1alpha3_BasicDevice is an autogenerated conversion function. +func Convert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error { + return autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in, out, s) +} + +func autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error { + out.Attributes = *(*map[QualifiedName]DeviceAttribute)(unsafe.Pointer(&in.Attributes)) + out.Capacity = *(*map[QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_v1alpha3_BasicDevice_To_api_BasicDevice is an autogenerated conversion function. +func Convert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error { + return autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in, out, s) +} + +func autoConvert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil { + return err + } + out.Basic = (*v1alpha3.BasicDevice)(unsafe.Pointer(in.Basic)) + return nil +} + +// Convert_api_Device_To_v1alpha3_Device is an autogenerated conversion function. +func Convert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error { + return autoConvert_api_Device_To_v1alpha3_Device(in, out, s) +} + +func autoConvert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil { + return err + } + out.Basic = (*BasicDevice)(unsafe.Pointer(in.Basic)) + return nil +} + +// Convert_v1alpha3_Device_To_api_Device is an autogenerated conversion function. +func Convert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error { + return autoConvert_v1alpha3_Device_To_api_Device(in, out, s) +} + +func autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error { + out.IntValue = (*int64)(unsafe.Pointer(in.IntValue)) + out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue)) + out.StringValue = (*string)(unsafe.Pointer(in.StringValue)) + out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue)) + return nil +} + +// Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute is an autogenerated conversion function. +func Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error { + return autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in, out, s) +} + +func autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error { + out.IntValue = (*int64)(unsafe.Pointer(in.IntValue)) + out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue)) + out.StringValue = (*string)(unsafe.Pointer(in.StringValue)) + out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue)) + return nil +} + +// Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute is an autogenerated conversion function. +func Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error { + return autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in, out, s) +} + +func autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil { + return err + } + out.Generation = in.Generation + out.ResourceSliceCount = in.ResourceSliceCount + return nil +} + +// Convert_api_ResourcePool_To_v1alpha3_ResourcePool is an autogenerated conversion function. +func Convert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error { + return autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in, out, s) +} + +func autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil { + return err + } + out.Generation = in.Generation + out.ResourceSliceCount = in.ResourceSliceCount + return nil +} + +// Convert_v1alpha3_ResourcePool_To_api_ResourcePool is an autogenerated conversion function. +func Convert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in, out, s) +} + +func autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error { + out.ObjectMeta = in.ObjectMeta + if err := Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil { + return err + } + return nil +} + +// Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice is an autogenerated conversion function. +func Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error { + return autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in, out, s) +} + +func autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error { + out.ObjectMeta = in.ObjectMeta + if err := Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil { + return err + } + return nil +} + +// Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice is an autogenerated conversion function. +func Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in, out, s) +} + +func autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Driver, &out.Driver, s); err != nil { + return err + } + if err := Convert_api_ResourcePool_To_v1alpha3_ResourcePool(&in.Pool, &out.Pool, s); err != nil { + return err + } + out.NodeName = in.NodeName + out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) + out.AllNodes = in.AllNodes + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]v1alpha3.Device, len(*in)) + for i := range *in { + if err := Convert_api_Device_To_v1alpha3_Device(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Devices = nil + } + return nil +} + +// Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec is an autogenerated conversion function. +func Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error { + return autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in, out, s) +} + +func autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Driver, &out.Driver, s); err != nil { + return err + } + if err := Convert_v1alpha3_ResourcePool_To_api_ResourcePool(&in.Pool, &out.Pool, s); err != nil { + return err + } + out.NodeName = in.NodeName + out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) + out.AllNodes = in.AllNodes + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]Device, len(*in)) + for i := range *in { + if err := Convert_v1alpha3_Device_To_api_Device(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Devices = nil + } + return nil +} + +// Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec is an autogenerated conversion function. +func Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in, out, s) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index dc19c8aab18..d32e960e536 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -27,18 +27,12 @@ import ( resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/util/sets" resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" + draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" "k8s.io/utils/ptr" ) -// ClaimLister returns a subset of the claims that a -// resourcelisters.ResourceClaimLister would return. -type ClaimLister interface { - // ListAllAllocated returns only claims which are allocated. - ListAllAllocated() ([]*resourceapi.ResourceClaim, error) -} - // Allocator calculates how to allocate a set of unallocated claims which use // structured parameters. // @@ -48,26 +42,29 @@ type ClaimLister interface { type Allocator struct { adminAccessEnabled bool claimsToAllocate []*resourceapi.ResourceClaim - claimLister ClaimLister + allocatedDevices []DeviceID classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister + slices []*resourceapi.ResourceSlice } // NewAllocator returns an allocator for a certain set of claims or an error if // some problem was detected which makes it impossible to allocate claims. +// +// The returned Allocator is stateless. It calls the listers anew for each +// Allocate call. func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, - claimLister ClaimLister, + allocatedDevices []DeviceID, classLister resourcelisters.DeviceClassLister, - sliceLister resourcelisters.ResourceSliceLister, + slices []*resourceapi.ResourceSlice, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, claimsToAllocate: claimsToAllocate, - claimLister: claimLister, + allocatedDevices: allocatedDevices, classLister: classLister, - sliceLister: sliceLister, + slices: slices, }, nil } @@ -107,14 +104,13 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] deviceMatchesRequest: make(map[matchKey]bool), constraints: make([][]constraint, len(a.claimsToAllocate)), requestData: make(map[requestIndices]requestData), - allocated: make(map[DeviceID]bool), result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)), } alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate)) defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr) // First determine all eligible pools. - pools, err := GatherPools(ctx, alloc.sliceLister, node) + pools, err := GatherPools(ctx, alloc.slices, node) if err != nil { return nil, fmt.Errorf("gather pool information: %w", err) } @@ -144,8 +140,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // and their requests. For each claim we determine how many devices // need to be allocated. If not all can be stored in the result, the // claim cannot be allocated. + numDevicesTotal := 0 for claimIndex, claim := range alloc.claimsToAllocate { - numDevices := 0 + numDevicesPerClaim := 0 // If we have any any request that wants "all" devices, we need to // figure out how much "all" is. If some pool is incomplete, we stop @@ -220,20 +217,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, fmt.Errorf("claim %s, request %s: unsupported count mode %s", klog.KObj(claim), request.Name, request.AllocationMode) } alloc.requestData[requestKey] = requestData - numDevices += requestData.numDevices + numDevicesPerClaim += requestData.numDevices } - alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices) + alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevicesPerClaim) // Check that we don't end up with too many results. - if numDevices > resourceapi.AllocationResultsMaxSize { - return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevices, resourceapi.AllocationResultsMaxSize) + if numDevicesPerClaim > resourceapi.AllocationResultsMaxSize { + return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevicesPerClaim, resourceapi.AllocationResultsMaxSize) } // If we don't, then we can pre-allocate the result slices for // appending the actual results later. alloc.result[claimIndex] = &resourceapi.AllocationResult{ Devices: resourceapi.DeviceAllocationResult{ - Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevices), + Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevicesPerClaim), }, } @@ -244,15 +241,16 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] for i, constraint := range claim.Spec.Devices.Constraints { switch { case constraint.MatchAttribute != nil: + matchAttribute := draapi.FullyQualifiedName(*constraint.MatchAttribute) logger := alloc.logger if loggerV := alloc.logger.V(6); loggerV.Enabled() { logger = klog.LoggerWithName(logger, "matchAttributeConstraint") - logger = klog.LoggerWithValues(logger, "matchAttribute", *constraint.MatchAttribute) + logger = klog.LoggerWithValues(logger, "matchAttribute", matchAttribute) } m := &matchAttributeConstraint{ logger: logger, requestNames: sets.New(constraint.Requests...), - attributeName: *constraint.MatchAttribute, + attributeName: matchAttribute, } constraints[i] = m default: @@ -261,6 +259,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] } } alloc.constraints[claimIndex] = constraints + numDevicesTotal += numDevicesPerClaim } // Selecting a device for a request is independent of what has been @@ -272,30 +271,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // Some of the existing devices are probably already allocated by // claims... - claims, err := alloc.claimLister.ListAllAllocated() - numAllocated := 0 - if err != nil { - return nil, fmt.Errorf("list allocated claims: %w", err) - } - for _, claim := range claims { - // Sanity check.. - if claim.Status.Allocation == nil { - continue - } - for _, result := range claim.Status.Allocation.Devices.Results { - // Kubernetes 1.31 did not set this, 1.32 always does. - // Supporting 1.31 is not worth the additional code that - // would have to be written (= looking up in request) because - // it is extremely unlikely that there really is a result - // that still exists in a cluster from 1.31 where this matters. - if ptr.Deref(result.AdminAccess, false) { - // Ignore, it's not considered allocated. - continue - } - deviceID := DeviceID{Driver: result.Driver, Pool: result.Pool, Device: result.Device} - alloc.allocated[deviceID] = true - numAllocated++ - } + numAllocated := len(alloc.allocatedDevices) + + // Pre-allocating the map avoids growing it later. We can estimate + // the size based on what is already allocated and what we need to + // allocate. + alloc.allocated = make(map[DeviceID]bool, numAllocated+numDevicesTotal) + for _, deviceID := range alloc.allocatedDevices { + alloc.allocated[deviceID] = true } alloc.logger.V(6).Info("Gathered information about allocated devices", "numAllocated", numAllocated) @@ -404,18 +387,18 @@ type requestData struct { type deviceWithID struct { DeviceID - device *resourceapi.BasicDevice + device *draapi.BasicDevice } type constraint interface { // add is called whenever a device is about to be allocated. It must // check whether the device matches the constraint and if yes, // track that it is allocated. - add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool + add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool // For every successful add there is exactly one matching removed call // with the exact same parameters. - remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) + remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) } // matchAttributeConstraint compares an attribute value across devices. @@ -428,13 +411,13 @@ type constraint interface { type matchAttributeConstraint struct { logger klog.Logger // Includes name and attribute name, so no need to repeat in log messages. requestNames sets.Set[string] - attributeName resourceapi.FullyQualifiedName + attributeName draapi.FullyQualifiedName - attribute *resourceapi.DeviceAttribute + attribute *draapi.DeviceAttribute numDevices int } -func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool { +func (m *matchAttributeConstraint) add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool { if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) { // Device not affected by constraint. m.logger.V(7).Info("Constraint does not apply to request", "request", requestName) @@ -491,7 +474,7 @@ func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.B return true } -func (m *matchAttributeConstraint) remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) { +func (m *matchAttributeConstraint) remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) { if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) { // Device not affected by constraint. return @@ -501,9 +484,9 @@ func (m *matchAttributeConstraint) remove(requestName string, device *resourceap m.logger.V(7).Info("Device removed from constraint set", "device", deviceID, "numDevices", m.numDevices) } -func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attributeName resourceapi.FullyQualifiedName) *resourceapi.DeviceAttribute { +func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute { // Fully-qualified match? - if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName)]; ok { + if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok { return &attr } index := strings.Index(string(attributeName), "/") @@ -512,14 +495,14 @@ func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attribu return nil } - if string(attributeName[0:index]) != deviceID.Driver { + if string(attributeName[0:index]) != deviceID.Driver.String() { // Not an attribute of the driver and not found above, // so it is not available. return nil } // Domain matches the driver, so let's check just the ID. - if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName[index+1:])]; ok { + if attr, ok := device.Attributes[draapi.QualifiedName(attributeName[index+1:])]; ok { return &attr } @@ -640,7 +623,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { } // isSelectable checks whether a device satisfies the request and class selectors. -func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.ResourceSlice, deviceIndex int) (bool, error) { +func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSlice, deviceIndex int) (bool, error) { // This is the only supported device type at the moment. device := slice.Spec.Devices[deviceIndex].Basic if device == nil { @@ -682,7 +665,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.Resour } -func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { +func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{}) if expr.Error != nil { @@ -697,7 +680,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error) } - matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver, Attributes: device.Attributes, Capacity: device.Capacity}) + // If this conversion turns out to be expensive, the CEL package could be converted + // to use unique strings. + var d resourceapi.BasicDevice + if err := draapi.Convert_api_BasicDevice_To_v1alpha3_BasicDevice(device, &d, nil); err != nil { + return false, fmt.Errorf("convert BasicDevice: %w", err) + } + matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity}) if class != nil { alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err) } else { @@ -727,7 +716,7 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas // as if that candidate had been allocated. If allocation cannot continue later // and must try something else, then the rollback function can be invoked to // restore the previous state. -func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) { +func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) { claim := alloc.claimsToAllocate[r.claimIndex] request := &claim.Spec.Devices.Requests[r.requestIndex] adminAccess := ptr.Deref(request.AdminAccess, false) @@ -762,9 +751,9 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.Basi } result := resourceapi.DeviceRequestAllocationResult{ Request: request.Name, - Driver: deviceID.Driver, - Pool: deviceID.Pool, - Device: deviceID.Device, + Driver: deviceID.Driver.String(), + Pool: deviceID.Pool.String(), + Device: deviceID.Device.String(), } if adminAccess { result.AdminAccess = &adminAccess @@ -836,15 +825,18 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes return nil, nil } -func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceSlice { +func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *draapi.ResourceSlice { + driverName := draapi.MakeUniqueString(deviceAllocation.Driver) + poolName := draapi.MakeUniqueString(deviceAllocation.Pool) + deviceName := draapi.MakeUniqueString(deviceAllocation.Device) for _, pool := range alloc.pools { - if pool.Driver != deviceAllocation.Driver || - pool.Pool != deviceAllocation.Pool { + if pool.Driver != driverName || + pool.Pool != poolName { continue } for _, slice := range pool.Slices { for _, device := range slice.Spec.Devices { - if device.Name == deviceAllocation.Device { + if device.Name == deviceName { return slice } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index d90ad29db6f..9b2008dce8d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -20,6 +20,7 @@ import ( "errors" "flag" "fmt" + "slices" "testing" "github.com/onsi/gomega" @@ -182,17 +183,6 @@ func claimWithDeviceConfig(name, request, class, driver, attribute string) *reso return claim } -// generate allocated ResourceClaim object -func allocatedClaim(name, request, class string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceClaim { - claim := claim(name, request, class) - claim.Status.Allocation = &resourceapi.AllocationResult{ - Devices: resourceapi.DeviceAllocationResult{ - Results: results, - }, - } - return claim -} - // generate a Device object with the given name, capacity and attributes. func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) resourceapi.Device { return resourceapi.Device{ @@ -362,7 +352,7 @@ func TestAllocator(t *testing.T) { testcases := map[string]struct { adminAccess bool claimsToAllocate []*resourceapi.ResourceClaim - allocatedClaims []*resourceapi.ResourceClaim + allocatedDevices []DeviceID classes []*resourceapi.DeviceClass slices []*resourceapi.ResourceSlice node *v1.Node @@ -943,12 +933,10 @@ func TestAllocator(t *testing.T) { }, "already-allocated-devices": { claimsToAllocate: objects(claim(claim0, req0, classA)), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, false), - deviceAllocationResult(req1, driverA, pool1, device2, false), - ), - ), + allocatedDevices: []DeviceID{ + MakeDeviceID(driverA, pool1, device1), + MakeDeviceID(driverA, pool1, device2), + }, classes: objects(class(classA, driverA)), slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), node: node(node1, region1), @@ -976,12 +964,10 @@ func TestAllocator(t *testing.T) { c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true) return []*resourceapi.ResourceClaim{c} }(), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, false), - deviceAllocationResult(req1, driverA, pool1, device2, false), - ), - ), + allocatedDevices: []DeviceID{ + MakeDeviceID(driverA, pool1, device1), + MakeDeviceID(driverA, pool1, device2), + }, classes: objects(class(classA, driverA)), slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), node: node(node1, region1), @@ -990,22 +976,6 @@ func TestAllocator(t *testing.T) { allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, true)), }, }, - "already-allocated-for-admin-access": { - claimsToAllocate: objects(claim(claim0, req0, classA)), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, true), - deviceAllocationResult(req1, driverA, pool1, device2, true), - ), - ), - classes: objects(class(classA, driverA)), - slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), - node: node(node1, region1), - - expectResults: []any{ - allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, false)), - }, - }, "with-constraint": { claimsToAllocate: objects(claimWithRequests( claim0, @@ -1397,23 +1367,15 @@ func TestAllocator(t *testing.T) { // Listing objects is deterministic and returns them in the same // order as in the test case. That makes the allocation result // also deterministic. - var allocated, toAllocate claimLister var classLister informerLister[resourceapi.DeviceClass] - var sliceLister informerLister[resourceapi.ResourceSlice] - for _, claim := range tc.claimsToAllocate { - toAllocate.claims = append(toAllocate.claims, claim.DeepCopy()) - } - for _, claim := range tc.allocatedClaims { - allocated.claims = append(allocated.claims, claim.DeepCopy()) - } - for _, slice := range tc.slices { - sliceLister.objs = append(sliceLister.objs, slice.DeepCopy()) - } for _, class := range tc.classes { classLister.objs = append(classLister.objs, class.DeepCopy()) } + claimsToAllocate := slices.Clone(tc.claimsToAllocate) + allocatedDevices := slices.Clone(tc.allocatedDevices) + slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, toAllocate.claims, allocated, classLister, sliceLister) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) @@ -1425,23 +1387,14 @@ func TestAllocator(t *testing.T) { g.Expect(results).To(gomega.ConsistOf(tc.expectResults...)) // Objects that the allocator had access to should not have been modified. - g.Expect(toAllocate.claims).To(gomega.HaveExactElements(tc.claimsToAllocate)) - g.Expect(allocated.claims).To(gomega.HaveExactElements(tc.allocatedClaims)) - g.Expect(sliceLister.objs).To(gomega.ConsistOf(tc.slices)) + g.Expect(claimsToAllocate).To(gomega.HaveExactElements(tc.claimsToAllocate)) + g.Expect(allocatedDevices).To(gomega.HaveExactElements(tc.allocatedDevices)) + g.Expect(slices).To(gomega.ConsistOf(tc.slices)) g.Expect(classLister.objs).To(gomega.ConsistOf(tc.classes)) }) } } -type claimLister struct { - claims []*resourceapi.ResourceClaim - err error -} - -func (l claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - return l.claims, l.err -} - type informerLister[T any] struct { objs []*T err error diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 64bb46c7b8e..8e3ba269e2e 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -22,10 +22,9 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + draapi "k8s.io/dynamic-resource-allocation/api" ) // GatherPools collects information about all resource pools which provide @@ -34,31 +33,30 @@ import ( // Out-dated slices are silently ignored. Pools may be incomplete (not all // required slices available) or invalid (for example, device names not unique). // Both is recorded in the result. -func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) { +func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node) ([]*Pool, error) { pools := make(map[PoolID]*Pool) - // TODO (future): use a custom lister interface and implement it with - // and indexer on the node name field. Then here we can ask for only - // slices with the right node name and those with no node name. - slices, err := sliceLister.List(labels.Everything()) - if err != nil { - return nil, fmt.Errorf("list resource slices: %w", err) - } for _, slice := range slices { switch { case slice.Spec.NodeName != "": if slice.Spec.NodeName == node.Name { - addSlice(pools, slice) + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err) + } } case slice.Spec.AllNodes: - addSlice(pools, slice) + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err) + } case slice.Spec.NodeSelector != nil: selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err) } if selector.Match(node) { - addSlice(pools, slice) + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err) + } } default: // Nothing known was set. This must be some future, unknown extension, @@ -84,22 +82,27 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL return result, nil } -func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { +func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error { + var slice draapi.ResourceSlice + if err := draapi.Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil { + return fmt.Errorf("convert ResourceSlice: %w", err) + } + id := PoolID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name} pool := pools[id] if pool == nil { // New pool. pool = &Pool{ PoolID: id, - Slices: []*resourceapi.ResourceSlice{slice}, + Slices: []*draapi.ResourceSlice{&slice}, } pools[id] = pool - return + return nil } if slice.Spec.Pool.Generation < pool.Slices[0].Spec.Pool.Generation { // Out-dated. - return + return nil } if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation { @@ -108,11 +111,12 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { } // Add to pool. - pool.Slices = append(pool.Slices, slice) + pool.Slices = append(pool.Slices, &slice) + return nil } func poolIsInvalid(pool *Pool) (bool, string) { - devices := sets.New[string]() + devices := sets.New[draapi.UniqueString]() for _, slice := range pool.Slices { for _, device := range slice.Spec.Devices { if devices.Has(device.Name) { @@ -129,21 +133,29 @@ type Pool struct { IsIncomplete bool IsInvalid bool InvalidReason string - Slices []*resourceapi.ResourceSlice + Slices []*draapi.ResourceSlice } type PoolID struct { - Driver, Pool string + Driver, Pool draapi.UniqueString } func (p PoolID) String() string { - return p.Driver + "/" + p.Pool + return p.Driver.String() + "/" + p.Pool.String() } type DeviceID struct { - Driver, Pool, Device string + Driver, Pool, Device draapi.UniqueString } func (d DeviceID) String() string { - return d.Driver + "/" + d.Pool + "/" + d.Device + return d.Driver.String() + "/" + d.Pool.String() + "/" + d.Device.String() +} + +func MakeDeviceID(driver, pool, device string) DeviceID { + return DeviceID{ + Driver: draapi.MakeUniqueString(driver), + Pool: draapi.MakeUniqueString(pool), + Device: draapi.MakeUniqueString(device), + } } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d85e624309b..e6063889276 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -277,7 +277,6 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister() claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) - claimLister := claimLister{cache: claimCache} informerFactory.Start(tCtx.Done()) defer func() { tCtx.Cancel("allocResourceClaimsOp.run is shutting down") @@ -295,6 +294,8 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) tCtx.ExpectNoError(err, "list nodes") + slices, err := sliceLister.List(labels.Everything()) + tCtx.ExpectNoError(err, "list slices") // Allocate one claim at a time, picking nodes randomly. Each // allocation is stored immediately, using the claim cache to avoid @@ -306,7 +307,19 @@ claims: continue } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister) + objs := claimCache.List(nil) + allocatedDevices := make([]structured.DeviceID, 0, len(objs)) + for _, obj := range objs { + claim := obj.(*resourceapi.ResourceClaim) + if claim.Status.Allocation == nil { + continue + } + for _, result := range claim.Status.Allocation.Devices.Results { + allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) + } + } + + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) { @@ -327,19 +340,3 @@ claims: tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) } } - -type claimLister struct { - cache *assumecache.AssumeCache -} - -func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - objs := c.cache.List(nil) - allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs)) - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) - if claim.Status.Allocation != nil { - allocatedClaims = append(allocatedClaims, claim) - } - } - return allocatedClaims, nil -} From 941d17b3b8ccac385f1143a91ee80df3c5764648 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 4 Sep 2024 16:42:03 +0200 Subject: [PATCH 04/13] DRA scheduler: code cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Looking up the slice can be avoided by storing it when allocating a device. The AllocationResult struct is small enough that it can be copied by value. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 33.30 ± 2% 33.95 ± 4% ~ (p=0.288 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 105.3 ± 2% 105.8 ± 2% ~ (p=0.524 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 100.8 ± 1% 100.7 ± 1% ~ (p=0.738 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 90.96 ± 2% 90.78 ± 1% ~ (p=0.952 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 49.84 ± 4% 50.51 ± 7% ~ (p=0.485 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 103.8 ± 1% 103.7 ± 5% ~ (p=0.582 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 27.21 ± 7% 28.50 ± 2% ~ (p=0.065 n=6) geomean 64.26 64.99 +1.14% --- .../dynamicresources/dynamicresources.go | 8 +- .../structured/allocator.go | 132 +++++++++--------- .../structured/allocator_test.go | 8 +- test/integration/scheduler_perf/dra.go | 2 +- 4 files changed, 78 insertions(+), 72 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 2a1bd028807..d1a08fc1081 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -87,7 +87,7 @@ type stateData struct { informationsForClaim []informationForClaim // nodeAllocations caches the result of Filter for the nodes. - nodeAllocations map[string][]*resourceapi.AllocationResult + nodeAllocations map[string][]resourceapi.AllocationResult } func (d *stateData) Clone() framework.StateData { @@ -545,7 +545,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, statusError(logger, err) } s.allocator = allocator - s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult) + s.nodeAllocations = make(map[string][]resourceapi.AllocationResult) } s.claims = claims @@ -634,7 +634,7 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState } // Use allocator to check the node and cache the result in case that the node is picked. - var allocations []*resourceapi.AllocationResult + var allocations []resourceapi.AllocationResult if state.allocator != nil { allocCtx := ctx if loggerV := logger.V(5); loggerV.Enabled() { @@ -782,7 +782,7 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat if index < 0 { return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name)) } - allocation := allocations[i] + allocation := &allocations[i] state.informationsForClaim[index].allocation = allocation // Strictly speaking, we don't need to store the full modified object. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index d32e960e536..dd55602cf8c 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -96,7 +96,7 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { // additional value. A name can also be useful because log messages do not // have a common prefix. V(5) is used for one-time log entries, V(6) for important // progress reports, and V(7) for detailed debug output. -func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []*resourceapi.AllocationResult, finalErr error) { +func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []resourceapi.AllocationResult, finalErr error) { alloc := &allocator{ Allocator: a, ctx: ctx, // all methods share the same a and thus ctx @@ -104,7 +104,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] deviceMatchesRequest: make(map[matchKey]bool), constraints: make([][]constraint, len(a.claimsToAllocate)), requestData: make(map[requestIndices]requestData), - result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)), + result: make([]internalAllocationResult, len(a.claimsToAllocate)), } alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate)) defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr) @@ -206,7 +206,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, err } if selectable { - requestData.allDevices = append(requestData.allDevices, deviceWithID{device: slice.Spec.Devices[deviceIndex].Basic, DeviceID: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}}) + device := deviceWithID{ + id: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}, + basic: slice.Spec.Devices[deviceIndex].Basic, + slice: slice, + } + requestData.allDevices = append(requestData.allDevices, device) } } } @@ -228,16 +233,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // If we don't, then we can pre-allocate the result slices for // appending the actual results later. - alloc.result[claimIndex] = &resourceapi.AllocationResult{ - Devices: resourceapi.DeviceAllocationResult{ - Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevicesPerClaim), - }, - } + alloc.result[claimIndex].devices = make([]internalDeviceResult, 0, numDevicesPerClaim) // Constraints are assumed to be monotonic: once a constraint returns // false, adding more devices will not cause it to return true. This // allows the search to stop early once a constraint returns false. - var constraints = make([]constraint, len(claim.Spec.Devices.Constraints)) + constraints := make([]constraint, len(claim.Spec.Devices.Constraints)) for i, constraint := range claim.Spec.Devices.Constraints { switch { case constraint.MatchAttribute != nil: @@ -306,8 +307,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, nil } - for claimIndex, allocationResult := range alloc.result { + result := make([]resourceapi.AllocationResult, len(alloc.result)) + for claimIndex, internalResult := range alloc.result { claim := alloc.claimsToAllocate[claimIndex] + allocationResult := &result[claimIndex] + allocationResult.Devices.Results = make([]resourceapi.DeviceRequestAllocationResult, len(internalResult.devices)) + for i, internal := range internalResult.devices { + allocationResult.Devices.Results[i] = resourceapi.DeviceRequestAllocationResult{ + Request: internal.request, + Driver: internal.id.Driver.String(), + Pool: internal.id.Pool.String(), + Device: internal.id.Device.String(), + AdminAccess: internal.adminAccess, + } + } // Populate configs. for requestIndex := range claim.Spec.Devices.Requests { @@ -331,14 +344,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] } // Determine node selector. - nodeSelector, err := alloc.createNodeSelector(allocationResult) + nodeSelector, err := alloc.createNodeSelector(internalResult.devices) if err != nil { return nil, fmt.Errorf("create NodeSelector for claim %s: %w", claim.Name, err) } allocationResult.NodeSelector = nodeSelector } - return alloc.result, nil + return result, nil } // errStop is a special error that gets returned by allocateOne if it detects @@ -356,7 +369,7 @@ type allocator struct { constraints [][]constraint // one list of constraints per claim requestData map[requestIndices]requestData // one entry per request allocated map[DeviceID]bool - result []*resourceapi.AllocationResult + result []internalAllocationResult } // matchKey identifies a device/request pair. @@ -386,8 +399,20 @@ type requestData struct { } type deviceWithID struct { - DeviceID - device *draapi.BasicDevice + id DeviceID + basic *draapi.BasicDevice + slice *draapi.ResourceSlice +} + +type internalAllocationResult struct { + devices []internalDeviceResult +} + +type internalDeviceResult struct { + request string + id DeviceID + slice *draapi.ResourceSlice + adminAccess *bool } type constraint interface { @@ -542,7 +567,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { // For "all" devices we already know which ones we need. We // just need to check whether we can use them. deviceWithID := requestData.allDevices[r.deviceIndex] - success, _, err := alloc.allocateDevice(r, deviceWithID.device, deviceWithID.DeviceID, true) + success, _, err := alloc.allocateDevice(r, deviceWithID, true) if err != nil { return false, err } @@ -593,7 +618,12 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { } // Finally treat as allocated and move on to the next device. - allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) + device := deviceWithID{ + id: deviceID, + basic: slice.Spec.Devices[deviceIndex].Basic, + slice: slice, + } + allocated, deallocate, err := alloc.allocateDevice(r, device, false) if err != nil { return false, err } @@ -716,28 +746,28 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDev // as if that candidate had been allocated. If allocation cannot continue later // and must try something else, then the rollback function can be invoked to // restore the previous state. -func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) { +func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, must bool) (bool, func(), error) { claim := alloc.claimsToAllocate[r.claimIndex] request := &claim.Spec.Devices.Requests[r.requestIndex] adminAccess := ptr.Deref(request.AdminAccess, false) - if !adminAccess && alloc.allocated[deviceID] { - alloc.logger.V(7).Info("Device in use", "device", deviceID) + if !adminAccess && alloc.allocated[device.id] { + alloc.logger.V(7).Info("Device in use", "device", device.id) return false, nil, nil } // It's available. Now check constraints. for i, constraint := range alloc.constraints[r.claimIndex] { - added := constraint.add(request.Name, device, deviceID) + added := constraint.add(request.Name, device.basic, device.id) if !added { if must { // It does not make sense to declare a claim where a constraint prevents getting // all devices. Treat this as an error. - return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, deviceID) + return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, device.id) } // Roll back for all previous constraints before we return. for e := 0; e < i; e++ { - alloc.constraints[r.claimIndex][e].remove(request.Name, device, deviceID) + alloc.constraints[r.claimIndex][e].remove(request.Name, device.basic, device.id) } return false, nil, nil } @@ -745,49 +775,45 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevi // All constraints satisfied. Mark as in use (unless we do admin access) // and record the result. - alloc.logger.V(7).Info("Device allocated", "device", deviceID) + alloc.logger.V(7).Info("Device allocated", "device", device.id) if !adminAccess { - alloc.allocated[deviceID] = true + alloc.allocated[device.id] = true } - result := resourceapi.DeviceRequestAllocationResult{ - Request: request.Name, - Driver: deviceID.Driver.String(), - Pool: deviceID.Pool.String(), - Device: deviceID.Device.String(), + result := internalDeviceResult{ + request: request.Name, + id: device.id, + slice: device.slice, } if adminAccess { - result.AdminAccess = &adminAccess + result.adminAccess = &adminAccess } - previousNumResults := len(alloc.result[r.claimIndex].Devices.Results) - alloc.result[r.claimIndex].Devices.Results = append(alloc.result[r.claimIndex].Devices.Results, result) + previousNumResults := len(alloc.result[r.claimIndex].devices) + alloc.result[r.claimIndex].devices = append(alloc.result[r.claimIndex].devices, result) return true, func() { for _, constraint := range alloc.constraints[r.claimIndex] { - constraint.remove(request.Name, device, deviceID) + constraint.remove(request.Name, device.basic, device.id) } if !adminAccess { - alloc.allocated[deviceID] = false + alloc.allocated[device.id] = false } // Truncate, but keep the underlying slice. - alloc.result[r.claimIndex].Devices.Results = alloc.result[r.claimIndex].Devices.Results[:previousNumResults] - alloc.logger.V(7).Info("Device deallocated", "device", deviceID) + alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults] + alloc.logger.V(7).Info("Device deallocated", "device", device.id) }, nil } // createNodeSelector constructs a node selector for the allocation, if needed, // otherwise it returns nil. -func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationResult) (*v1.NodeSelector, error) { +func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) { // Selector with one term. That term gets extended with additional // requirements from the different devices. nodeSelector := &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{{}}, } - for _, deviceAllocation := range allocation.Devices.Results { - slice := alloc.findSlice(deviceAllocation) - if slice == nil { - return nil, fmt.Errorf("internal error: device %+v not found in pools", deviceAllocation) - } + for i := range result { + slice := result[i].slice if slice.Spec.NodeName != "" { // At least one device is local to one node. This // restricts the allocation to that node. @@ -825,26 +851,6 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes return nil, nil } -func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *draapi.ResourceSlice { - driverName := draapi.MakeUniqueString(deviceAllocation.Driver) - poolName := draapi.MakeUniqueString(deviceAllocation.Pool) - deviceName := draapi.MakeUniqueString(deviceAllocation.Device) - for _, pool := range alloc.pools { - if pool.Driver != driverName || - pool.Pool != poolName { - continue - } - for _, slice := range pool.Slices { - for _, device := range slice.Spec.Devices { - if device.Name == deviceName { - return slice - } - } - } - } - return nil -} - func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) { for _, requirement := range from { if !containsNodeSelectorRequirement(*to, requirement) { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 9b2008dce8d..6b2816d47fa 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -266,13 +266,13 @@ func localNodeSelector(nodeName string) *v1.NodeSelector { // allocationResult returns a matcher for one AllocationResult pointer with a list of // embedded device allocation results. The order of those results doesn't matter. func allocationResult(selector *v1.NodeSelector, results ...resourceapi.DeviceRequestAllocationResult) types.GomegaMatcher { - return gstruct.PointTo(gstruct.MatchFields(0, gstruct.Fields{ + return gstruct.MatchFields(0, gstruct.Fields{ "Devices": gstruct.MatchFields(0, gstruct.Fields{ "Results": gomega.ConsistOf(results), // Order is irrelevant. "Config": gomega.BeNil(), }), "NodeSelector": matchNodeSelector(selector), - })) + }) } // matchNodeSelector returns a matcher for a node selector. The order @@ -315,8 +315,8 @@ func matchNodeSelectorRequirement(requirement v1.NodeSelectorRequirement) types. }) } -func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.AllocationResult { - return &resourceapi.AllocationResult{ +func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) resourceapi.AllocationResult { + return resourceapi.AllocationResult{ Devices: resourceapi.DeviceAllocationResult{ Results: results, Config: []resourceapi.DeviceAllocationConfiguration{ diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index e6063889276..8b25d9895dc 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -330,7 +330,7 @@ claims: tCtx.ExpectNoError(err, "allocate claim") if result != nil { claim = claim.DeepCopy() - claim.Status.Allocation = result[0] + claim.Status.Allocation = &result[0] claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) tCtx.ExpectNoError(err, "update claim status with allocation") tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") From 814c9428fd56aa81c9bbad27dd49a7c2948bfd09 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 4 Sep 2024 18:52:58 +0200 Subject: [PATCH 05/13] DRA scheduler: cache compiled CEL expressions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeviceClasses and different requests are very likely to contain the same expression string. We don't need to compile that over and over again. To avoid hanging onto that cache longer than necessary, it's currently tied to each PreFilter/Filter combination. It might make sense to move this up into the scheduler plugin and thus reuse compiled expressions for different pods. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 33.95 ± 4% 36.65 ± 2% +7.95% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 105.8 ± 2% 106.7 ± 3% ~ (p=0.177 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 100.7 ± 1% 119.7 ± 3% +18.82% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 90.78 ± 1% 121.10 ± 4% +33.40% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 50.51 ± 7% 63.72 ± 3% +26.17% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 103.7 ± 5% 110.2 ± 2% +6.32% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 28.50 ± 2% 28.16 ± 5% ~ (p=0.102 n=6) geomean 64.99 73.15 +12.56% --- .../dynamicresources/dynamicresources.go | 8 ++- .../k8s.io/dynamic-resource-allocation/go.mod | 2 +- .../structured/allocator.go | 27 +++++++++- .../structured/allocator_test.go | 2 +- .../structured/celcache.go | 54 +++++++++++++++++++ test/integration/scheduler_perf/dra.go | 3 +- 6 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index d1a08fc1081..bac18e416d4 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -112,6 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister + celCache *structured.CELCache // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -186,6 +187,11 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(), sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(), claimAssumeCache: fh.ResourceClaimCache(), + + // This is a LRU cache for compiled CEL expressions. The most + // recent 10 of them get reused across different scheduling + // cycles. + celCache: structured.NewCELCache(10), } return pl, nil @@ -540,7 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices) + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 8cbfbf386db..50a1a1b356d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,6 +8,7 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -36,7 +37,6 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index dd55602cf8c..e64c807f95f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,6 +30,7 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" + "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -45,6 +46,8 @@ type Allocator struct { allocatedDevices []DeviceID classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice + celCache *CELCache + celMutex keymutex.KeyMutex } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -58,6 +61,7 @@ func NewAllocator(ctx context.Context, allocatedDevices []DeviceID, classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, + celCache *CELCache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -65,6 +69,8 @@ func NewAllocator(ctx context.Context, allocatedDevices: allocatedDevices, classLister: classLister, slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } @@ -73,6 +79,25 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } +func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + a.celMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer a.celMutex.UnlockKey(expression) + + cached := a.celCache.get(expression) + if cached != nil { + return *cached + } + + expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) + if expr.Error == nil { + a.celCache.add(expression, &expr) + } + return expr +} + // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -697,7 +722,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{}) + expr := alloc.compileCELExpression(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 6b2816d47fa..c9d87325bdb 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -1375,7 +1375,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go new file mode 100644 index 00000000000..f4c84180fe5 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go @@ -0,0 +1,54 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package structured + +import ( + "sync" + + "github.com/golang/groupcache/lru" + + "k8s.io/dynamic-resource-allocation/cel" +) + +// CELCache is an LRU cache for a compiled CEL expression. +type CELCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewCELCache returns a CELCache +func NewCELCache(maxCacheEntries int) *CELCache { + return &CELCache{ + cache: lru.New(maxCacheEntries), + } +} + +func (c *CELCache) add(expression string, expr *cel.CompilationResult) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *CELCache) get(expression string) *cel.CompilationResult { + c.mutex.RLock() + defer c.mutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*cel.CompilationResult) +} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 8b25d9895dc..ce161911eab 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -290,6 +290,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + celCache := structured.NewCELCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) @@ -319,7 +320,7 @@ claims: } } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices) + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) { From f070dd760cfa65aacef1e9ebfa40cc709dd2a2ed Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 5 Sep 2024 18:41:34 +0200 Subject: [PATCH 06/13] DRA scheduler: also pre-compute the unique ResourceSlice.NodeName MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Converting a node's name once to a unique string and then converting to many unique names is faster than memory comparisons. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 36.65 ± 2% 36.89 ± 2% ~ (p=0.452 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 106.7 ± 3% 105.7 ± 5% ~ (p=0.701 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 119.7 ± 3% 117.8 ± 3% ~ (p=0.084 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 121.1 ± 4% 119.5 ± 4% ~ (p=0.297 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 63.72 ± 3% 63.22 ± 2% ~ (p=0.485 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 110.2 ± 2% 109.5 ± 2% ~ (p=0.258 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 28.16 ± 5% 27.56 ± 5% ~ (p=0.513 n=6) geomean 73.15 72.44 -0.98% --- .../k8s.io/dynamic-resource-allocation/api/conversion.go | 8 ++++++++ .../src/k8s.io/dynamic-resource-allocation/api/types.go | 2 +- .../api/zz_generated.conversion.go | 8 ++++++-- .../dynamic-resource-allocation/structured/allocator.go | 4 ++-- .../dynamic-resource-allocation/structured/pools.go | 7 ++++++- 5 files changed, 23 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go index 28186e6baca..077fe2bc423 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go @@ -29,11 +29,19 @@ var ( ) func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error { + if *in == NullUniqueString { + *out = "" + return nil + } *out = in.String() return nil } func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error { + if *in == "" { + *out = NullUniqueString + return nil + } *out = UniqueString(unique.Make(*in)) return nil } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/types.go b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go index 3d4928323fd..282a1e9c0c1 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/api/types.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go @@ -31,7 +31,7 @@ type ResourceSlice struct { type ResourceSliceSpec struct { Driver UniqueString Pool ResourcePool - NodeName string + NodeName UniqueString NodeSelector *v1.NodeSelector AllNodes bool Devices []Device diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go index e12091e987f..65fbfba1726 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go @@ -246,7 +246,9 @@ func autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *Resourc if err := Convert_api_ResourcePool_To_v1alpha3_ResourcePool(&in.Pool, &out.Pool, s); err != nil { return err } - out.NodeName = in.NodeName + if err := Convert_api_UniqueString_To_string(&in.NodeName, &out.NodeName, s); err != nil { + return err + } out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) out.AllNodes = in.AllNodes if in.Devices != nil { @@ -275,7 +277,9 @@ func autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha if err := Convert_v1alpha3_ResourcePool_To_api_ResourcePool(&in.Pool, &out.Pool, s); err != nil { return err } - out.NodeName = in.NodeName + if err := Convert_string_To_api_UniqueString(&in.NodeName, &out.NodeName, s); err != nil { + return err + } out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) out.AllNodes = in.AllNodes if in.Devices != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index e64c807f95f..e12f29aeb3a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -839,7 +839,7 @@ func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.N for i := range result { slice := result[i].slice - if slice.Spec.NodeName != "" { + if slice.Spec.NodeName != draapi.NullUniqueString { // At least one device is local to one node. This // restricts the allocation to that node. return &v1.NodeSelector{ @@ -847,7 +847,7 @@ func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.N MatchFields: []v1.NodeSelectorRequirement{{ Key: "metadata.name", Operator: v1.NodeSelectorOpIn, - Values: []string{slice.Spec.NodeName}, + Values: []string{slice.Spec.NodeName.String()}, }}, }}, }, nil diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 8e3ba269e2e..37cabaddc65 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -35,11 +35,15 @@ import ( // Both is recorded in the result. func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node) ([]*Pool, error) { pools := make(map[PoolID]*Pool) + nodeName := "" + if node != nil { + nodeName = node.Name + } for _, slice := range slices { switch { case slice.Spec.NodeName != "": - if slice.Spec.NodeName == node.Name { + if slice.Spec.NodeName == nodeName { if err := addSlice(pools, slice); err != nil { return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err) } @@ -49,6 +53,7 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err) } case slice.Spec.NodeSelector != nil: + // TODO: move conversion into api. selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err) From f0efb8a5fd09bb538bc2018d2a7da65e4d40a2ac Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 4 Sep 2024 18:55:39 +0200 Subject: [PATCH 07/13] DRA scheduler: populate set of allocated devices only once MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The devices which are allocated before starting the allocation always remain allocated. They can be stored once in a set, then each Filter call for the different nodes can reuse that set instead of allocating it anew for each node. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 36.89 ± 2% 54.70 ± 6% +48.26% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 105.7 ± 5% 106.4 ± 4% ~ (p=0.970 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 117.8 ± 3% 120.0 ± 4% ~ (p=0.134 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 119.5 ± 4% 112.5 ± 4% -5.86% (p=0.009 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 63.22 ± 2% 87.13 ± 4% +37.82% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 109.5 ± 2% 113.4 ± 2% +3.65% (p=0.006 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 27.56 ± 5% 65.55 ± 3% +137.84% (p=0.002 n=6) geomean 72.44 90.81 +25.37% --- .../structured/allocator.go | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index e12f29aeb3a..51f950fb132 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -43,7 +43,7 @@ import ( type Allocator struct { adminAccessEnabled bool claimsToAllocate []*resourceapi.ResourceClaim - allocatedDevices []DeviceID + allocatedDevices sets.Set[DeviceID] classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice celCache *CELCache @@ -66,11 +66,12 @@ func NewAllocator(ctx context.Context, return &Allocator{ adminAccessEnabled: adminAccessEnabled, claimsToAllocate: claimsToAllocate, - allocatedDevices: allocatedDevices, - classLister: classLister, - slices: slices, - celCache: celCache, - celMutex: keymutex.NewHashed(0), + // This won't change, so build this set only once. + allocatedDevices: sets.New(allocatedDevices...), + classLister: classLister, + slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } @@ -295,18 +296,10 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // the Allocate call and can be compared in Go. alloc.deviceMatchesRequest = make(map[matchKey]bool) - // Some of the existing devices are probably already allocated by - // claims... - numAllocated := len(alloc.allocatedDevices) + // We can estimate the size based on what we need to allocate. + alloc.allocatingDevices = make(map[DeviceID]bool, numDevicesTotal) - // Pre-allocating the map avoids growing it later. We can estimate - // the size based on what is already allocated and what we need to - // allocate. - alloc.allocated = make(map[DeviceID]bool, numAllocated+numDevicesTotal) - for _, deviceID := range alloc.allocatedDevices { - alloc.allocated[deviceID] = true - } - alloc.logger.V(6).Info("Gathered information about allocated devices", "numAllocated", numAllocated) + alloc.logger.V(6).Info("Gathered information about devices", "numAllocated", len(alloc.allocatedDevices), "toBeAllocated", numDevicesTotal) // In practice, there aren't going to be many different CEL // expressions. Most likely, there is going to be handful of different @@ -393,7 +386,7 @@ type allocator struct { deviceMatchesRequest map[matchKey]bool constraints [][]constraint // one list of constraints per claim requestData map[requestIndices]requestData // one entry per request - allocated map[DeviceID]bool + allocatingDevices map[DeviceID]bool result []internalAllocationResult } @@ -620,7 +613,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { deviceID := DeviceID{Driver: pool.Driver, Pool: pool.Pool, Device: slice.Spec.Devices[deviceIndex].Name} // Checking for "in use" is cheap and thus gets done first. - if !ptr.Deref(request.AdminAccess, false) && alloc.allocated[deviceID] { + if !ptr.Deref(request.AdminAccess, false) && (alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) { alloc.logger.V(7).Info("Device in use", "device", deviceID) continue } @@ -775,7 +768,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus claim := alloc.claimsToAllocate[r.claimIndex] request := &claim.Spec.Devices.Requests[r.requestIndex] adminAccess := ptr.Deref(request.AdminAccess, false) - if !adminAccess && alloc.allocated[device.id] { + if !adminAccess && (alloc.allocatedDevices.Has(device.id) || alloc.allocatingDevices[device.id]) { alloc.logger.V(7).Info("Device in use", "device", device.id) return false, nil, nil } @@ -802,7 +795,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus // and record the result. alloc.logger.V(7).Info("Device allocated", "device", device.id) if !adminAccess { - alloc.allocated[device.id] = true + alloc.allocatingDevices[device.id] = true } result := internalDeviceResult{ request: request.Name, @@ -820,7 +813,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus constraint.remove(request.Name, device.basic, device.id) } if !adminAccess { - alloc.allocated[device.id] = false + alloc.allocatingDevices[device.id] = false } // Truncate, but keep the underlying slice. alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults] From bc55e826217b1ed9c9f26a9168d885c387082f1f Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 5 Sep 2024 18:44:21 +0200 Subject: [PATCH 08/13] DRA scheduler: maintain a set of allocated device IDs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reacting to events from the informer cache (indirectly, through the assume cache) is more efficient than repeatedly listing it's content and then converting to IDs with unique strings. goos: linux goarch: amd64 pkg: k8s.io/kubernetes/test/integration/scheduler_perf cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz │ before │ after │ │ SchedulingThroughput/Average │ SchedulingThroughput/Average vs base │ PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36 54.70 ± 6% 76.81 ± 6% +40.42% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36 106.4 ± 4% 105.6 ± 2% ~ (p=0.413 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36 120.0 ± 4% 118.9 ± 7% ~ (p=0.117 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36 112.5 ± 4% 105.9 ± 4% -5.87% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36 87.13 ± 4% 123.55 ± 4% +41.80% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36 113.4 ± 2% 103.3 ± 2% -8.95% (p=0.002 n=6) PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36 65.55 ± 3% 121.30 ± 3% +85.05% (p=0.002 n=6) geomean 90.81 106.8 +17.57% --- .../dynamicresources/allocateddevices.go | 158 ++++++++++++++++++ .../dynamicresources/dynamicresources.go | 49 +++--- .../structured/allocator.go | 13 +- .../structured/allocator_test.go | 3 +- test/integration/scheduler_perf/dra.go | 5 +- 5 files changed, 195 insertions(+), 33 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go new file mode 100644 index 00000000000..b1629c0db6c --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -0,0 +1,158 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "sync" + + resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/utils/ptr" +) + +// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices. +// This is cheaper than repeatedly calling List, making strings unique, and building the set +// each time PreFilter is called. +// +// All methods are thread-safe. Get returns a cloned set. +type allocatedDevices struct { + logger klog.Logger + + mutex sync.RWMutex + ids sets.Set[structured.DeviceID] +} + +func newAllocatedDevices(logger klog.Logger) *allocatedDevices { + return &allocatedDevices{ + logger: logger, + ids: sets.New[structured.DeviceID](), + } +} + +func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.ids.Clone() +} + +func (a *allocatedDevices) handlers() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: a.onAdd, + UpdateFunc: a.onUpdate, + DeleteFunc: a.onDelete, + } +} + +func (a *allocatedDevices) onAdd(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onAdd") + return + } + + if claim.Status.Allocation != nil { + a.addDevices(claim) + } +} + +func (a *allocatedDevices) onUpdate(oldObj, newObj any) { + originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate") + return + } + + switch { + case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil: + a.addDevices(modifiedClaim) + case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil: + a.removeDevices(originalClaim) + default: + // Nothing to do. Either both nil or both non-nil, in which case the content + // also must be the same (immutable!). + } +} + +func (a *allocatedDevices) onDelete(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onDelete") + return + } + + a.removeDevices(claim) +} + +func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + + for _, result := range claim.Status.Allocation.Devices.Results { + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + a.logger.V(6).Info("Device was allocated", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + } + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Insert(deviceID) + } +} + +func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + + for _, result := range claim.Status.Allocation.Devices.Results { + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated and thus does not need to be removed + // because of this claim. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + a.logger.V(6).Info("Device was deallocated", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + } + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Delete(deviceID) + } +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index bac18e416d4..98a6ad75200 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -108,11 +108,12 @@ type DynamicResources struct { enableAdminAccess bool enableSchedulingQueueHint bool - fh framework.Handle - clientset kubernetes.Interface - classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister - celCache *structured.CELCache + fh framework.Handle + clientset kubernetes.Interface + classLister resourcelisters.DeviceClassLister + sliceLister resourcelisters.ResourceSliceLister + celCache *structured.CELCache + allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -177,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &DynamicResources{}, nil } + logger := klog.FromContext(ctx) pl := &DynamicResources{ enabled: true, enableAdminAccess: fts.EnableDRAAdminAccess, @@ -192,8 +194,14 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe // recent 10 of them get reused across different scheduling // cycles. celCache: structured.NewCELCache(10), + + allocatedDevices: newAllocatedDevices(logger), } + // Reacting to events is more efficient than iterating over the list + // repeatedly in PreFilter. + pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers()) + return pl, nil } @@ -538,10 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - allocatedDevices := pl.listAllAllocatedDevices() - if err != nil { - return nil, statusError(logger, err) - } + allocatedDevices := pl.listAllAllocatedDevices(logger) slices, err := pl.sliceLister.List(labels.Everything()) if err != nil { return nil, statusError(logger, err) @@ -558,18 +563,14 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, nil } -func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID { - // Probably not worth adding an index for? - objs := pl.claimAssumeCache.List(nil) - var allocated []structured.DeviceID - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) - if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok { - claim = obj.(*resourceapi.ResourceClaim) - } - if claim.Status.Allocation == nil { - continue - } +func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] { + // Start with a fresh set that matches the current known state of the + // world according to the informers. + allocated := pl.allocatedDevices.Get() + + // Whatever is in flight also has to be checked. + pl.inFlightAllocations.Range(func(key, value any) bool { + claim := value.(*resourceapi.ResourceClaim) for _, result := range claim.Status.Allocation.Devices.Results { // Kubernetes 1.31 did not set this, 1.32 always does. // Supporting 1.31 is not worth the additional code that @@ -581,9 +582,11 @@ func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID { continue } deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) - allocated = append(allocated, deviceID) + logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) + allocated.Insert(deviceID) } - } + return true + }) return allocated } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index 51f950fb132..b79e7950e5a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -58,7 +58,7 @@ type Allocator struct { func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, - allocatedDevices []DeviceID, + allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, celCache *CELCache, @@ -66,12 +66,11 @@ func NewAllocator(ctx context.Context, return &Allocator{ adminAccessEnabled: adminAccessEnabled, claimsToAllocate: claimsToAllocate, - // This won't change, so build this set only once. - allocatedDevices: sets.New(allocatedDevices...), - classLister: classLister, - slices: slices, - celCache: celCache, - celMutex: keymutex.NewHashed(0), + allocatedDevices: allocatedDevices, + classLister: classLister, + slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index c9d87325bdb..e647660a3fe 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -1375,7 +1376,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1)) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index ce161911eab..143f269da9e 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" @@ -309,14 +310,14 @@ claims: } objs := claimCache.List(nil) - allocatedDevices := make([]structured.DeviceID, 0, len(objs)) + allocatedDevices := sets.New[structured.DeviceID]() for _, obj := range objs { claim := obj.(*resourceapi.ResourceClaim) if claim.Status.Allocation == nil { continue } for _, result := range claim.Status.Allocation.Devices.Results { - allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) + allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) } } From bd7ff9c4c75d2b4c3c459a4d9753840289c71e76 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 28 Oct 2024 18:42:52 +0100 Subject: [PATCH 09/13] DRA scheduler: update some log strings --- .../framework/plugins/dynamicresources/allocateddevices.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index b1629c0db6c..d013244433e 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -119,7 +119,7 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { continue } deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) - a.logger.V(6).Info("Device was allocated", "device", deviceID, "claim", klog.KObj(claim)) + a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim)) deviceIDs = append(deviceIDs, deviceID) } @@ -146,7 +146,7 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { continue } deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) - a.logger.V(6).Info("Device was deallocated", "device", deviceID, "claim", klog.KObj(claim)) + a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim)) deviceIDs = append(deviceIDs, deviceID) } From 0130ebba1d8a349c6ccea158141b01ba4f781e19 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 28 Oct 2024 19:00:20 +0100 Subject: [PATCH 10/13] DRA scheduler: refactor "allocated devices" lookup The logic for skipping "admin access" was repeated in three different places. A single foreachAllocatedDevices with a callback puts it into one function. --- .../dynamicresources/allocateddevices.go | 51 ++++++++++++------- .../dynamicresources/dynamicresources.go | 15 +----- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index d013244433e..a4a5a415972 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -28,6 +28,36 @@ import ( "k8s.io/utils/ptr" ) +// foreachAllocatedDevice invokes the provided callback for each +// device in the claim's allocation result which was allocated +// exclusively for the claim. +// +// Devices allocated with admin access can be shared with other +// claims and are skipped without invoking the callback. +// +// foreachAllocatedDevice does nothing if the claim is not allocated. +func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) { + if claim.Status.Allocation == nil { + return + } + for _, result := range claim.Status.Allocation.Devices.Results { + // Kubernetes 1.31 did not set this, 1.32 always does. + // Supporting 1.31 is not worth the additional code that + // would have to be written (= looking up in request) because + // it is extremely unlikely that there really is a result + // that still exists in a cluster from 1.31 where this matters. + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + + // None of the users of this helper need to abort iterating, + // therefore it's not supported as it only would add overhead. + cb(deviceID) + } +} + // allocatedDevices reacts to events in a cache and maintains a set of all allocated devices. // This is cheaper than repeatedly calling List, making strings unique, and building the set // each time PreFilter is called. @@ -112,16 +142,10 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { // Locking of the mutex gets minimized by pre-computing what needs to be done // without holding the lock. deviceIDs := make([]structured.DeviceID, 0, 20) - - for _, result := range claim.Status.Allocation.Devices.Results { - if ptr.Deref(result.AdminAccess, false) { - // Is not considered as allocated. - continue - } - deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim)) deviceIDs = append(deviceIDs, deviceID) - } + }) a.mutex.Lock() defer a.mutex.Unlock() @@ -138,17 +162,10 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { // Locking of the mutex gets minimized by pre-computing what needs to be done // without holding the lock. deviceIDs := make([]structured.DeviceID, 0, 20) - - for _, result := range claim.Status.Allocation.Devices.Results { - if ptr.Deref(result.AdminAccess, false) { - // Is not considered as allocated and thus does not need to be removed - // because of this claim. - continue - } - deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim)) deviceIDs = append(deviceIDs, deviceID) - } + }) a.mutex.Lock() defer a.mutex.Unlock() diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 98a6ad75200..18ed9c91812 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -46,7 +46,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" - "k8s.io/utils/ptr" ) const ( @@ -571,20 +570,10 @@ func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set // Whatever is in flight also has to be checked. pl.inFlightAllocations.Range(func(key, value any) bool { claim := value.(*resourceapi.ResourceClaim) - for _, result := range claim.Status.Allocation.Devices.Results { - // Kubernetes 1.31 did not set this, 1.32 always does. - // Supporting 1.31 is not worth the additional code that - // would have to be written (= looking up in request) because - // it is extremely unlikely that there really is a result - // that still exists in a cluster from 1.31 where this matters. - if ptr.Deref(result.AdminAccess, false) { - // Is not considered as allocated. - continue - } - deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) allocated.Insert(deviceID) - } + }) return true }) return allocated From ae6b5522eac9e4b73ac5463434b95830adfa6fc1 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 28 Oct 2024 20:05:40 +0100 Subject: [PATCH 11/13] DRA scheduler: rename variable "Allocated devices" are the ones which can be observed from the informer. "All allocated devices" also includes those which are in flight and haven't been written back to the apiserver. --- .../framework/plugins/dynamicresources/dynamicresources.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 18ed9c91812..233fb183923 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -545,12 +545,12 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - allocatedDevices := pl.listAllAllocatedDevices(logger) + allAllocatedDevices := pl.listAllAllocatedDevices(logger) slices, err := pl.sliceLister.List(labels.Everything()) if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices, pl.celCache) + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } From 6f07fa3a5e6ff3321b100d95cd4c0181055b4ca4 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 30 Oct 2024 10:10:12 +0100 Subject: [PATCH 12/13] DRA scheduler: update some stale comments --- .../framework/plugins/dynamicresources/dynamicresources.go | 2 +- .../k8s.io/dynamic-resource-allocation/structured/allocator.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 233fb183923..8bc5a44891e 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -542,7 +542,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // expensive, we may have to maintain and update state more // persistently. // - // Claims are treated as "allocated" if they are in the assume cache + // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. allAllocatedDevices := pl.listAllAllocatedDevices(logger) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index b79e7950e5a..e858c77f1b5 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -53,8 +53,7 @@ type Allocator struct { // NewAllocator returns an allocator for a certain set of claims or an error if // some problem was detected which makes it impossible to allocate claims. // -// The returned Allocator is stateless. It calls the listers anew for each -// Allocate call. +// The returned Allocator can be used multiple times and is thread-safe. func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, From 7863d9a3819bfae228215d94982dab50748be56c Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 30 Oct 2024 10:36:56 +0100 Subject: [PATCH 13/13] DRA scheduler: refactor CEL compilation cache A better place is the cel package because a) the name can become shorter and b) it is tightly coupled with the compiler there. Moving the compilation into the cache simplifies the callers. --- .../dynamicresources/dynamicresources.go | 5 +- .../dynamic-resource-allocation/cel/cache.go | 79 +++++++++++++++ .../cel/cache_test.go | 98 +++++++++++++++++++ .../k8s.io/dynamic-resource-allocation/go.mod | 2 +- .../structured/allocator.go | 28 +----- .../structured/allocator_test.go | 3 +- .../structured/celcache.go | 54 ---------- test/integration/scheduler_perf/dra.go | 3 +- 8 files changed, 188 insertions(+), 84 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go delete mode 100644 staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 8bc5a44891e..0427e6deb08 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -38,6 +38,7 @@ import ( resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" @@ -111,7 +112,7 @@ type DynamicResources struct { clientset kubernetes.Interface classLister resourcelisters.DeviceClassLister sliceLister resourcelisters.ResourceSliceLister - celCache *structured.CELCache + celCache *cel.Cache allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object @@ -192,7 +193,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe // This is a LRU cache for compiled CEL expressions. The most // recent 10 of them get reused across different scheduling // cycles. - celCache: structured.NewCELCache(10), + celCache: cel.NewCache(10), allocatedDevices: newAllocatedDevices(logger), } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go new file mode 100644 index 00000000000..2868886c5bb --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "sync" + + "k8s.io/utils/keymutex" + "k8s.io/utils/lru" +) + +// Cache is a thread-safe LRU cache for a compiled CEL expression. +type Cache struct { + compileMutex keymutex.KeyMutex + cacheMutex sync.RWMutex + cache *lru.Cache +} + +// NewCache creates a cache. The maximum number of entries determines +// how many entries are cached at most before dropping the oldest +// entry. +func NewCache(maxCacheEntries int) *Cache { + return &Cache{ + compileMutex: keymutex.NewHashed(0), + cache: lru.New(maxCacheEntries), + } +} + +// GetOrCompile checks whether the cache already has a compilation result +// and returns that if available. Otherwise it compiles, stores successful +// results and returns the new result. +func (c *Cache) GetOrCompile(expression string) CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + c.compileMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer c.compileMutex.UnlockKey(expression) + + cached := c.get(expression) + if cached != nil { + return *cached + } + + expr := GetCompiler().CompileCELExpression(expression, Options{}) + if expr.Error == nil { + c.add(expression, &expr) + } + return expr +} + +func (c *Cache) add(expression string, expr *CompilationResult) { + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *Cache) get(expression string) *CompilationResult { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*CompilationResult) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go new file mode 100644 index 00000000000..3c68a94db82 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheSemantic(t *testing.T) { + // Cache two entries. + // + // Entries are comparable structs with pointers inside. Each + // compilation leads to different pointers, so the entries can be + // compared by value to figure out whether an entry was cached or + // compiled anew. + cache := NewCache(2) + + // Successful compilations get cached. + resultTrue := cache.GetOrCompile("true") + require.Nil(t, resultTrue.Error) + resultTrueAgain := cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should have been cached") + } + + // Unsuccessful ones don't. + resultFailed := cache.GetOrCompile("no-such-variable") + require.NotNil(t, resultFailed.Error) + resultFailedAgain := cache.GetOrCompile("no-such-variable") + if resultFailed == resultFailedAgain { + t.Fatal("result of compiling `no-such-variable` should not have been cached") + } + + // The cache can hold a second result. + resultFalse := cache.GetOrCompile("false") + require.Nil(t, resultFalse.Error) + resultFalseAgain := cache.GetOrCompile("false") + if resultFalse != resultFalseAgain { + t.Fatal("result of compiling `false` should have been cached") + } + resultTrueAgain = cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should still have been cached") + } + + // A third result pushes out the least recently used one. + resultOther := cache.GetOrCompile("false && true") + require.Nil(t, resultFalse.Error) + resultOtherAgain := cache.GetOrCompile("false && true") + if resultOther != resultOtherAgain { + t.Fatal("result of compiling `false && true` should have been cached") + } + resultFalseAgain = cache.GetOrCompile("false") + if resultFalse == resultFalseAgain { + t.Fatal("result of compiling `false` should have been evicted from the cache") + } +} + +func TestCacheConcurrency(t *testing.T) { + // There's no guarantee that concurrent use of the cache would really + // trigger the race detector in `go test -race`, but in practice + // it does when not using the cacheMutex. + // + // The compileMutex ony affects performance and thus cannot be tested + // without benchmarking. + numWorkers := 10 + + cache := NewCache(2) + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func(i int) { + defer wg.Done() + result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i)) + assert.Nil(t, result.Error) + }(i) + } + wg.Wait() +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index 50a1a1b356d..8cbfbf386db 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -8,7 +8,6 @@ godebug default=go1.23 require ( github.com/blang/semver/v4 v4.0.0 - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/cel-go v0.21.0 github.com/google/go-cmp v0.6.0 github.com/onsi/gomega v1.33.1 @@ -37,6 +36,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index e858c77f1b5..f4c17487672 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -30,7 +30,6 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" - "k8s.io/utils/keymutex" "k8s.io/utils/ptr" ) @@ -46,8 +45,7 @@ type Allocator struct { allocatedDevices sets.Set[DeviceID] classLister resourcelisters.DeviceClassLister slices []*resourceapi.ResourceSlice - celCache *CELCache - celMutex keymutex.KeyMutex + celCache *cel.Cache } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -60,7 +58,7 @@ func NewAllocator(ctx context.Context, allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, - celCache *CELCache, + celCache *cel.Cache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, @@ -69,7 +67,6 @@ func NewAllocator(ctx context.Context, classLister: classLister, slices: slices, celCache: celCache, - celMutex: keymutex.NewHashed(0), }, nil } @@ -78,25 +75,6 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { return a.claimsToAllocate } -func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult { - // Compiling a CEL expression is expensive enough that it is cheaper - // to lock a mutex than doing it several times in parallel. - a.celMutex.LockKey(expression) - //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. - defer a.celMutex.UnlockKey(expression) - - cached := a.celCache.get(expression) - if cached != nil { - return *cached - } - - expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{}) - if expr.Error == nil { - a.celCache.add(expression, &expr) - } - return expr -} - // Allocate calculates the allocation(s) for one particular node. // // It returns an error only if some fatal problem occurred. These are errors @@ -713,7 +691,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := alloc.compileCELExpression(selector.CEL.Expression) + expr := alloc.celCache.GetOrCompile(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index e647660a3fe..8684108cdb2 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -1376,7 +1377,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1)) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go deleted file mode 100644 index f4c84180fe5..00000000000 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package structured - -import ( - "sync" - - "github.com/golang/groupcache/lru" - - "k8s.io/dynamic-resource-allocation/cel" -) - -// CELCache is an LRU cache for a compiled CEL expression. -type CELCache struct { - mutex sync.RWMutex - cache *lru.Cache -} - -// NewCELCache returns a CELCache -func NewCELCache(maxCacheEntries int) *CELCache { - return &CELCache{ - cache: lru.New(maxCacheEntries), - } -} - -func (c *CELCache) add(expression string, expr *cel.CompilationResult) { - c.mutex.Lock() - defer c.mutex.Unlock() - c.cache.Add(expression, expr) -} - -func (c *CELCache) get(expression string) *cel.CompilationResult { - c.mutex.RLock() - defer c.mutex.RUnlock() - expr, found := c.cache.Get(expression) - if !found { - return nil - } - return expr.(*cel.CompilationResult) -} diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index 143f269da9e..a4cb1cfe905 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -34,6 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" @@ -291,7 +292,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") - celCache := structured.NewCELCache(10) + celCache := cel.NewCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything())