diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go new file mode 100644 index 00000000000..a4a5a415972 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -0,0 +1,175 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "sync" + + resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/utils/ptr" +) + +// foreachAllocatedDevice invokes the provided callback for each +// device in the claim's allocation result which was allocated +// exclusively for the claim. +// +// Devices allocated with admin access can be shared with other +// claims and are skipped without invoking the callback. +// +// foreachAllocatedDevice does nothing if the claim is not allocated. +func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) { + if claim.Status.Allocation == nil { + return + } + for _, result := range claim.Status.Allocation.Devices.Results { + // Kubernetes 1.31 did not set this, 1.32 always does. + // Supporting 1.31 is not worth the additional code that + // would have to be written (= looking up in request) because + // it is extremely unlikely that there really is a result + // that still exists in a cluster from 1.31 where this matters. + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + + // None of the users of this helper need to abort iterating, + // therefore it's not supported as it only would add overhead. + cb(deviceID) + } +} + +// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices. +// This is cheaper than repeatedly calling List, making strings unique, and building the set +// each time PreFilter is called. +// +// All methods are thread-safe. Get returns a cloned set. +type allocatedDevices struct { + logger klog.Logger + + mutex sync.RWMutex + ids sets.Set[structured.DeviceID] +} + +func newAllocatedDevices(logger klog.Logger) *allocatedDevices { + return &allocatedDevices{ + logger: logger, + ids: sets.New[structured.DeviceID](), + } +} + +func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.ids.Clone() +} + +func (a *allocatedDevices) handlers() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: a.onAdd, + UpdateFunc: a.onUpdate, + DeleteFunc: a.onDelete, + } +} + +func (a *allocatedDevices) onAdd(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onAdd") + return + } + + if claim.Status.Allocation != nil { + a.addDevices(claim) + } +} + +func (a *allocatedDevices) onUpdate(oldObj, newObj any) { + originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate") + return + } + + switch { + case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil: + a.addDevices(modifiedClaim) + case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil: + a.removeDevices(originalClaim) + default: + // Nothing to do. Either both nil or both non-nil, in which case the content + // also must be the same (immutable!). + } +} + +func (a *allocatedDevices) onDelete(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onDelete") + return + } + + a.removeDevices(claim) +} + +func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { + a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + }) + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Insert(deviceID) + } +} + +func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { + a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + }) + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Delete(deviceID) + } +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 92842efc253..0427e6deb08 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -30,6 +30,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -37,6 +38,7 @@ import ( resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" @@ -85,7 +87,7 @@ type stateData struct { informationsForClaim []informationForClaim // nodeAllocations caches the result of Filter for the nodes. - nodeAllocations map[string][]*resourceapi.AllocationResult + nodeAllocations map[string][]resourceapi.AllocationResult } func (d *stateData) Clone() framework.StateData { @@ -106,10 +108,12 @@ type DynamicResources struct { enableAdminAccess bool enableSchedulingQueueHint bool - fh framework.Handle - clientset kubernetes.Interface - classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister + fh framework.Handle + clientset kubernetes.Interface + classLister resourcelisters.DeviceClassLister + sliceLister resourcelisters.ResourceSliceLister + celCache *cel.Cache + allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -174,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &DynamicResources{}, nil } + logger := klog.FromContext(ctx) pl := &DynamicResources{ enabled: true, enableAdminAccess: fts.EnableDRAAdminAccess, @@ -184,8 +189,19 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(), sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(), claimAssumeCache: fh.ResourceClaimCache(), + + // This is a LRU cache for compiled CEL expressions. The most + // recent 10 of them get reused across different scheduling + // cycles. + celCache: cel.NewCache(10), + + allocatedDevices: newAllocatedDevices(logger), } + // Reacting to events is more efficient than iterating over the list + // repeatedly in PreFilter. + pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers()) + return pl, nil } @@ -527,39 +543,41 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // expensive, we may have to maintain and update state more // persistently. // - // Claims are treated as "allocated" if they are in the assume cache - // or currently their allocation is in-flight. - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}, pl.classLister, pl.sliceLister) + // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache + // or currently their allocation is in-flight. This does not change + // during filtering, so we can determine that once. + allAllocatedDevices := pl.listAllAllocatedDevices(logger) + slices, err := pl.sliceLister.List(labels.Everything()) + if err != nil { + return nil, statusError(logger, err) + } + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } s.allocator = allocator - s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult) + s.nodeAllocations = make(map[string][]resourceapi.AllocationResult) } s.claims = claims return nil, nil } -type claimListerForAssumeCache struct { - assumeCache *assumecache.AssumeCache - inFlightAllocations *sync.Map -} +func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] { + // Start with a fresh set that matches the current known state of the + // world according to the informers. + allocated := pl.allocatedDevices.Get() -func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - // Probably not worth adding an index for? - objs := cl.assumeCache.List(nil) - allocated := make([]*resourceapi.ResourceClaim, 0, len(objs)) - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) - if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok { - claim = obj.(*resourceapi.ResourceClaim) - } - if claim.Status.Allocation != nil { - allocated = append(allocated, claim) - } - } - return allocated, nil + // Whatever is in flight also has to be checked. + pl.inFlightAllocations.Range(func(key, value any) bool { + claim := value.(*resourceapi.ResourceClaim) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { + logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) + allocated.Insert(deviceID) + }) + return true + }) + return allocated } // PreFilterExtensions returns prefilter extensions, pod add and remove. @@ -615,7 +633,7 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState } // Use allocator to check the node and cache the result in case that the node is picked. - var allocations []*resourceapi.AllocationResult + var allocations []resourceapi.AllocationResult if state.allocator != nil { allocCtx := ctx if loggerV := logger.V(5); loggerV.Enabled() { @@ -763,7 +781,7 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat if index < 0 { return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name)) } - allocation := allocations[i] + allocation := &allocations[i] state.informationsForClaim[index].allocation = allocation // Strictly speaking, we don't need to store the full modified object. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go new file mode 100644 index 00000000000..077fe2bc423 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/conversion.go @@ -0,0 +1,47 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "unique" + + conversion "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + localSchemeBuilder runtime.SchemeBuilder + AddToScheme = localSchemeBuilder.AddToScheme +) + +func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error { + if *in == NullUniqueString { + *out = "" + return nil + } + *out = in.String() + return nil +} + +func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error { + if *in == "" { + *out = NullUniqueString + return nil + } + *out = UniqueString(unique.Make(*in)) + return nil +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go b/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go new file mode 100644 index 00000000000..9e28284ff3f --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package api contains a variant of the API where strings are unique. These +// unique strings are faster to compare and more efficient when used as key in +// a map. +// +// +k8s:conversion-gen=k8s.io/api/resource/v1alpha3 +package api diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/types.go b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go new file mode 100644 index 00000000000..282a1e9c0c1 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/types.go @@ -0,0 +1,64 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ResourceSlice struct { + metav1.TypeMeta + metav1.ObjectMeta + Spec ResourceSliceSpec +} + +type ResourceSliceSpec struct { + Driver UniqueString + Pool ResourcePool + NodeName UniqueString + NodeSelector *v1.NodeSelector + AllNodes bool + Devices []Device +} + +type ResourcePool struct { + Name UniqueString + Generation int64 + ResourceSliceCount int64 +} +type Device struct { + Name UniqueString + Basic *BasicDevice +} + +type BasicDevice struct { + Attributes map[QualifiedName]DeviceAttribute + Capacity map[QualifiedName]resource.Quantity +} + +type QualifiedName string + +type FullyQualifiedName string + +type DeviceAttribute struct { + IntValue *int64 + BoolValue *bool + StringValue *string + VersionValue *string +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go b/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go new file mode 100644 index 00000000000..34d413743ac --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/uniquestring.go @@ -0,0 +1,41 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "unique" +) + +// NullUniqueString is a UniqueString which contains no string. +var NullUniqueString UniqueString + +// UniqueString is a wrapper around [unique.Handle[string]]. +type UniqueString unique.Handle[string] + +// Returns the string that is stored in the UniqueString. +// If the UniqueString is null, the empty string is returned. +func (us UniqueString) String() string { + if us == NullUniqueString { + return "" + } + return unique.Handle[string](us).Value() +} + +// MakeUniqueString constructs a new unique string. +func MakeUniqueString(str string) UniqueString { + return UniqueString(unique.Make(str)) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go new file mode 100644 index 00000000000..65fbfba1726 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/api/zz_generated.conversion.go @@ -0,0 +1,302 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by conversion-gen. DO NOT EDIT. + +package api + +import ( + unsafe "unsafe" + + v1 "k8s.io/api/core/v1" + v1alpha3 "k8s.io/api/resource/v1alpha3" + resource "k8s.io/apimachinery/pkg/api/resource" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*BasicDevice)(nil), (*v1alpha3.BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_BasicDevice_To_v1alpha3_BasicDevice(a.(*BasicDevice), b.(*v1alpha3.BasicDevice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.BasicDevice)(nil), (*BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_BasicDevice_To_api_BasicDevice(a.(*v1alpha3.BasicDevice), b.(*BasicDevice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*Device)(nil), (*v1alpha3.Device)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_Device_To_v1alpha3_Device(a.(*Device), b.(*v1alpha3.Device), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.Device)(nil), (*Device)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_Device_To_api_Device(a.(*v1alpha3.Device), b.(*Device), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*DeviceAttribute)(nil), (*v1alpha3.DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(a.(*DeviceAttribute), b.(*v1alpha3.DeviceAttribute), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.DeviceAttribute)(nil), (*DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(a.(*v1alpha3.DeviceAttribute), b.(*DeviceAttribute), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourcePool)(nil), (*v1alpha3.ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourcePool_To_v1alpha3_ResourcePool(a.(*ResourcePool), b.(*v1alpha3.ResourcePool), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourcePool)(nil), (*ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourcePool_To_api_ResourcePool(a.(*v1alpha3.ResourcePool), b.(*ResourcePool), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourceSlice)(nil), (*v1alpha3.ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(a.(*ResourceSlice), b.(*v1alpha3.ResourceSlice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSlice)(nil), (*ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(a.(*v1alpha3.ResourceSlice), b.(*ResourceSlice), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ResourceSliceSpec)(nil), (*v1alpha3.ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(a.(*ResourceSliceSpec), b.(*v1alpha3.ResourceSliceSpec), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSliceSpec)(nil), (*ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(a.(*v1alpha3.ResourceSliceSpec), b.(*ResourceSliceSpec), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*UniqueString)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_api_UniqueString_To_string(a.(*UniqueString), b.(*string), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*string)(nil), (*UniqueString)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_string_To_api_UniqueString(a.(*string), b.(*UniqueString), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error { + out.Attributes = *(*map[v1alpha3.QualifiedName]v1alpha3.DeviceAttribute)(unsafe.Pointer(&in.Attributes)) + out.Capacity = *(*map[v1alpha3.QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_api_BasicDevice_To_v1alpha3_BasicDevice is an autogenerated conversion function. +func Convert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error { + return autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in, out, s) +} + +func autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error { + out.Attributes = *(*map[QualifiedName]DeviceAttribute)(unsafe.Pointer(&in.Attributes)) + out.Capacity = *(*map[QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity)) + return nil +} + +// Convert_v1alpha3_BasicDevice_To_api_BasicDevice is an autogenerated conversion function. +func Convert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error { + return autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in, out, s) +} + +func autoConvert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil { + return err + } + out.Basic = (*v1alpha3.BasicDevice)(unsafe.Pointer(in.Basic)) + return nil +} + +// Convert_api_Device_To_v1alpha3_Device is an autogenerated conversion function. +func Convert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error { + return autoConvert_api_Device_To_v1alpha3_Device(in, out, s) +} + +func autoConvert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil { + return err + } + out.Basic = (*BasicDevice)(unsafe.Pointer(in.Basic)) + return nil +} + +// Convert_v1alpha3_Device_To_api_Device is an autogenerated conversion function. +func Convert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error { + return autoConvert_v1alpha3_Device_To_api_Device(in, out, s) +} + +func autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error { + out.IntValue = (*int64)(unsafe.Pointer(in.IntValue)) + out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue)) + out.StringValue = (*string)(unsafe.Pointer(in.StringValue)) + out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue)) + return nil +} + +// Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute is an autogenerated conversion function. +func Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error { + return autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in, out, s) +} + +func autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error { + out.IntValue = (*int64)(unsafe.Pointer(in.IntValue)) + out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue)) + out.StringValue = (*string)(unsafe.Pointer(in.StringValue)) + out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue)) + return nil +} + +// Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute is an autogenerated conversion function. +func Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error { + return autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in, out, s) +} + +func autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil { + return err + } + out.Generation = in.Generation + out.ResourceSliceCount = in.ResourceSliceCount + return nil +} + +// Convert_api_ResourcePool_To_v1alpha3_ResourcePool is an autogenerated conversion function. +func Convert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error { + return autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in, out, s) +} + +func autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil { + return err + } + out.Generation = in.Generation + out.ResourceSliceCount = in.ResourceSliceCount + return nil +} + +// Convert_v1alpha3_ResourcePool_To_api_ResourcePool is an autogenerated conversion function. +func Convert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in, out, s) +} + +func autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error { + out.ObjectMeta = in.ObjectMeta + if err := Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil { + return err + } + return nil +} + +// Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice is an autogenerated conversion function. +func Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error { + return autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in, out, s) +} + +func autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error { + out.ObjectMeta = in.ObjectMeta + if err := Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil { + return err + } + return nil +} + +// Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice is an autogenerated conversion function. +func Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in, out, s) +} + +func autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error { + if err := Convert_api_UniqueString_To_string(&in.Driver, &out.Driver, s); err != nil { + return err + } + if err := Convert_api_ResourcePool_To_v1alpha3_ResourcePool(&in.Pool, &out.Pool, s); err != nil { + return err + } + if err := Convert_api_UniqueString_To_string(&in.NodeName, &out.NodeName, s); err != nil { + return err + } + out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) + out.AllNodes = in.AllNodes + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]v1alpha3.Device, len(*in)) + for i := range *in { + if err := Convert_api_Device_To_v1alpha3_Device(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Devices = nil + } + return nil +} + +// Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec is an autogenerated conversion function. +func Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error { + return autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in, out, s) +} + +func autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error { + if err := Convert_string_To_api_UniqueString(&in.Driver, &out.Driver, s); err != nil { + return err + } + if err := Convert_v1alpha3_ResourcePool_To_api_ResourcePool(&in.Pool, &out.Pool, s); err != nil { + return err + } + if err := Convert_string_To_api_UniqueString(&in.NodeName, &out.NodeName, s); err != nil { + return err + } + out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector)) + out.AllNodes = in.AllNodes + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]Device, len(*in)) + for i := range *in { + if err := Convert_v1alpha3_Device_To_api_Device(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Devices = nil + } + return nil +} + +// Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec is an autogenerated conversion function. +func Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error { + return autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in, out, s) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go new file mode 100644 index 00000000000..2868886c5bb --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "sync" + + "k8s.io/utils/keymutex" + "k8s.io/utils/lru" +) + +// Cache is a thread-safe LRU cache for a compiled CEL expression. +type Cache struct { + compileMutex keymutex.KeyMutex + cacheMutex sync.RWMutex + cache *lru.Cache +} + +// NewCache creates a cache. The maximum number of entries determines +// how many entries are cached at most before dropping the oldest +// entry. +func NewCache(maxCacheEntries int) *Cache { + return &Cache{ + compileMutex: keymutex.NewHashed(0), + cache: lru.New(maxCacheEntries), + } +} + +// GetOrCompile checks whether the cache already has a compilation result +// and returns that if available. Otherwise it compiles, stores successful +// results and returns the new result. +func (c *Cache) GetOrCompile(expression string) CompilationResult { + // Compiling a CEL expression is expensive enough that it is cheaper + // to lock a mutex than doing it several times in parallel. + c.compileMutex.LockKey(expression) + //nolint:errcheck // Only returns an error for unknown keys, which isn't the case here. + defer c.compileMutex.UnlockKey(expression) + + cached := c.get(expression) + if cached != nil { + return *cached + } + + expr := GetCompiler().CompileCELExpression(expression, Options{}) + if expr.Error == nil { + c.add(expression, &expr) + } + return expr +} + +func (c *Cache) add(expression string, expr *CompilationResult) { + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + c.cache.Add(expression, expr) +} + +func (c *Cache) get(expression string) *CompilationResult { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + expr, found := c.cache.Get(expression) + if !found { + return nil + } + return expr.(*CompilationResult) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go new file mode 100644 index 00000000000..3c68a94db82 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/cel/cache_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cel + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheSemantic(t *testing.T) { + // Cache two entries. + // + // Entries are comparable structs with pointers inside. Each + // compilation leads to different pointers, so the entries can be + // compared by value to figure out whether an entry was cached or + // compiled anew. + cache := NewCache(2) + + // Successful compilations get cached. + resultTrue := cache.GetOrCompile("true") + require.Nil(t, resultTrue.Error) + resultTrueAgain := cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should have been cached") + } + + // Unsuccessful ones don't. + resultFailed := cache.GetOrCompile("no-such-variable") + require.NotNil(t, resultFailed.Error) + resultFailedAgain := cache.GetOrCompile("no-such-variable") + if resultFailed == resultFailedAgain { + t.Fatal("result of compiling `no-such-variable` should not have been cached") + } + + // The cache can hold a second result. + resultFalse := cache.GetOrCompile("false") + require.Nil(t, resultFalse.Error) + resultFalseAgain := cache.GetOrCompile("false") + if resultFalse != resultFalseAgain { + t.Fatal("result of compiling `false` should have been cached") + } + resultTrueAgain = cache.GetOrCompile("true") + if resultTrue != resultTrueAgain { + t.Fatal("result of compiling `true` should still have been cached") + } + + // A third result pushes out the least recently used one. + resultOther := cache.GetOrCompile("false && true") + require.Nil(t, resultFalse.Error) + resultOtherAgain := cache.GetOrCompile("false && true") + if resultOther != resultOtherAgain { + t.Fatal("result of compiling `false && true` should have been cached") + } + resultFalseAgain = cache.GetOrCompile("false") + if resultFalse == resultFalseAgain { + t.Fatal("result of compiling `false` should have been evicted from the cache") + } +} + +func TestCacheConcurrency(t *testing.T) { + // There's no guarantee that concurrent use of the cache would really + // trigger the race detector in `go test -race`, but in practice + // it does when not using the cacheMutex. + // + // The compileMutex ony affects performance and thus cannot be tested + // without benchmarking. + numWorkers := 10 + + cache := NewCache(2) + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go func(i int) { + defer wg.Done() + result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i)) + assert.Nil(t, result.Error) + }(i) + } + wg.Wait() +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index dc19c8aab18..f4c17487672 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -27,18 +27,12 @@ import ( resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/util/sets" resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" + draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" "k8s.io/utils/ptr" ) -// ClaimLister returns a subset of the claims that a -// resourcelisters.ResourceClaimLister would return. -type ClaimLister interface { - // ListAllAllocated returns only claims which are allocated. - ListAllAllocated() ([]*resourceapi.ResourceClaim, error) -} - // Allocator calculates how to allocate a set of unallocated claims which use // structured parameters. // @@ -48,26 +42,31 @@ type ClaimLister interface { type Allocator struct { adminAccessEnabled bool claimsToAllocate []*resourceapi.ResourceClaim - claimLister ClaimLister + allocatedDevices sets.Set[DeviceID] classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister + slices []*resourceapi.ResourceSlice + celCache *cel.Cache } // NewAllocator returns an allocator for a certain set of claims or an error if // some problem was detected which makes it impossible to allocate claims. +// +// The returned Allocator can be used multiple times and is thread-safe. func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, - claimLister ClaimLister, + allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, - sliceLister resourcelisters.ResourceSliceLister, + slices []*resourceapi.ResourceSlice, + celCache *cel.Cache, ) (*Allocator, error) { return &Allocator{ adminAccessEnabled: adminAccessEnabled, claimsToAllocate: claimsToAllocate, - claimLister: claimLister, + allocatedDevices: allocatedDevices, classLister: classLister, - sliceLister: sliceLister, + slices: slices, + celCache: celCache, }, nil } @@ -99,7 +98,7 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim { // additional value. A name can also be useful because log messages do not // have a common prefix. V(5) is used for one-time log entries, V(6) for important // progress reports, and V(7) for detailed debug output. -func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []*resourceapi.AllocationResult, finalErr error) { +func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []resourceapi.AllocationResult, finalErr error) { alloc := &allocator{ Allocator: a, ctx: ctx, // all methods share the same a and thus ctx @@ -107,14 +106,13 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] deviceMatchesRequest: make(map[matchKey]bool), constraints: make([][]constraint, len(a.claimsToAllocate)), requestData: make(map[requestIndices]requestData), - allocated: make(map[DeviceID]bool), - result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)), + result: make([]internalAllocationResult, len(a.claimsToAllocate)), } alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate)) defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr) // First determine all eligible pools. - pools, err := GatherPools(ctx, alloc.sliceLister, node) + pools, err := GatherPools(ctx, alloc.slices, node) if err != nil { return nil, fmt.Errorf("gather pool information: %w", err) } @@ -144,8 +142,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // and their requests. For each claim we determine how many devices // need to be allocated. If not all can be stored in the result, the // claim cannot be allocated. + numDevicesTotal := 0 for claimIndex, claim := range alloc.claimsToAllocate { - numDevices := 0 + numDevicesPerClaim := 0 // If we have any any request that wants "all" devices, we need to // figure out how much "all" is. If some pool is incomplete, we stop @@ -209,7 +208,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, err } if selectable { - requestData.allDevices = append(requestData.allDevices, deviceWithID{device: slice.Spec.Devices[deviceIndex].Basic, DeviceID: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}}) + device := deviceWithID{ + id: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}, + basic: slice.Spec.Devices[deviceIndex].Basic, + slice: slice, + } + requestData.allDevices = append(requestData.allDevices, device) } } } @@ -220,39 +224,36 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, fmt.Errorf("claim %s, request %s: unsupported count mode %s", klog.KObj(claim), request.Name, request.AllocationMode) } alloc.requestData[requestKey] = requestData - numDevices += requestData.numDevices + numDevicesPerClaim += requestData.numDevices } - alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices) + alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevicesPerClaim) // Check that we don't end up with too many results. - if numDevices > resourceapi.AllocationResultsMaxSize { - return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevices, resourceapi.AllocationResultsMaxSize) + if numDevicesPerClaim > resourceapi.AllocationResultsMaxSize { + return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevicesPerClaim, resourceapi.AllocationResultsMaxSize) } // If we don't, then we can pre-allocate the result slices for // appending the actual results later. - alloc.result[claimIndex] = &resourceapi.AllocationResult{ - Devices: resourceapi.DeviceAllocationResult{ - Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevices), - }, - } + alloc.result[claimIndex].devices = make([]internalDeviceResult, 0, numDevicesPerClaim) // Constraints are assumed to be monotonic: once a constraint returns // false, adding more devices will not cause it to return true. This // allows the search to stop early once a constraint returns false. - var constraints = make([]constraint, len(claim.Spec.Devices.Constraints)) + constraints := make([]constraint, len(claim.Spec.Devices.Constraints)) for i, constraint := range claim.Spec.Devices.Constraints { switch { case constraint.MatchAttribute != nil: + matchAttribute := draapi.FullyQualifiedName(*constraint.MatchAttribute) logger := alloc.logger if loggerV := alloc.logger.V(6); loggerV.Enabled() { logger = klog.LoggerWithName(logger, "matchAttributeConstraint") - logger = klog.LoggerWithValues(logger, "matchAttribute", *constraint.MatchAttribute) + logger = klog.LoggerWithValues(logger, "matchAttribute", matchAttribute) } m := &matchAttributeConstraint{ logger: logger, requestNames: sets.New(constraint.Requests...), - attributeName: *constraint.MatchAttribute, + attributeName: matchAttribute, } constraints[i] = m default: @@ -261,6 +262,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] } } alloc.constraints[claimIndex] = constraints + numDevicesTotal += numDevicesPerClaim } // Selecting a device for a request is independent of what has been @@ -270,34 +272,10 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // the Allocate call and can be compared in Go. alloc.deviceMatchesRequest = make(map[matchKey]bool) - // Some of the existing devices are probably already allocated by - // claims... - claims, err := alloc.claimLister.ListAllAllocated() - numAllocated := 0 - if err != nil { - return nil, fmt.Errorf("list allocated claims: %w", err) - } - for _, claim := range claims { - // Sanity check.. - if claim.Status.Allocation == nil { - continue - } - for _, result := range claim.Status.Allocation.Devices.Results { - // Kubernetes 1.31 did not set this, 1.32 always does. - // Supporting 1.31 is not worth the additional code that - // would have to be written (= looking up in request) because - // it is extremely unlikely that there really is a result - // that still exists in a cluster from 1.31 where this matters. - if ptr.Deref(result.AdminAccess, false) { - // Ignore, it's not considered allocated. - continue - } - deviceID := DeviceID{Driver: result.Driver, Pool: result.Pool, Device: result.Device} - alloc.allocated[deviceID] = true - numAllocated++ - } - } - alloc.logger.V(6).Info("Gathered information about allocated devices", "numAllocated", numAllocated) + // We can estimate the size based on what we need to allocate. + alloc.allocatingDevices = make(map[DeviceID]bool, numDevicesTotal) + + alloc.logger.V(6).Info("Gathered information about devices", "numAllocated", len(alloc.allocatedDevices), "toBeAllocated", numDevicesTotal) // In practice, there aren't going to be many different CEL // expressions. Most likely, there is going to be handful of different @@ -323,8 +301,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] return nil, nil } - for claimIndex, allocationResult := range alloc.result { + result := make([]resourceapi.AllocationResult, len(alloc.result)) + for claimIndex, internalResult := range alloc.result { claim := alloc.claimsToAllocate[claimIndex] + allocationResult := &result[claimIndex] + allocationResult.Devices.Results = make([]resourceapi.DeviceRequestAllocationResult, len(internalResult.devices)) + for i, internal := range internalResult.devices { + allocationResult.Devices.Results[i] = resourceapi.DeviceRequestAllocationResult{ + Request: internal.request, + Driver: internal.id.Driver.String(), + Pool: internal.id.Pool.String(), + Device: internal.id.Device.String(), + AdminAccess: internal.adminAccess, + } + } // Populate configs. for requestIndex := range claim.Spec.Devices.Requests { @@ -348,14 +338,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] } // Determine node selector. - nodeSelector, err := alloc.createNodeSelector(allocationResult) + nodeSelector, err := alloc.createNodeSelector(internalResult.devices) if err != nil { return nil, fmt.Errorf("create NodeSelector for claim %s: %w", claim.Name, err) } allocationResult.NodeSelector = nodeSelector } - return alloc.result, nil + return result, nil } // errStop is a special error that gets returned by allocateOne if it detects @@ -372,8 +362,8 @@ type allocator struct { deviceMatchesRequest map[matchKey]bool constraints [][]constraint // one list of constraints per claim requestData map[requestIndices]requestData // one entry per request - allocated map[DeviceID]bool - result []*resourceapi.AllocationResult + allocatingDevices map[DeviceID]bool + result []internalAllocationResult } // matchKey identifies a device/request pair. @@ -403,19 +393,31 @@ type requestData struct { } type deviceWithID struct { - DeviceID - device *resourceapi.BasicDevice + id DeviceID + basic *draapi.BasicDevice + slice *draapi.ResourceSlice +} + +type internalAllocationResult struct { + devices []internalDeviceResult +} + +type internalDeviceResult struct { + request string + id DeviceID + slice *draapi.ResourceSlice + adminAccess *bool } type constraint interface { // add is called whenever a device is about to be allocated. It must // check whether the device matches the constraint and if yes, // track that it is allocated. - add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool + add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool // For every successful add there is exactly one matching removed call // with the exact same parameters. - remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) + remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) } // matchAttributeConstraint compares an attribute value across devices. @@ -428,13 +430,13 @@ type constraint interface { type matchAttributeConstraint struct { logger klog.Logger // Includes name and attribute name, so no need to repeat in log messages. requestNames sets.Set[string] - attributeName resourceapi.FullyQualifiedName + attributeName draapi.FullyQualifiedName - attribute *resourceapi.DeviceAttribute + attribute *draapi.DeviceAttribute numDevices int } -func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool { +func (m *matchAttributeConstraint) add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool { if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) { // Device not affected by constraint. m.logger.V(7).Info("Constraint does not apply to request", "request", requestName) @@ -491,7 +493,7 @@ func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.B return true } -func (m *matchAttributeConstraint) remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) { +func (m *matchAttributeConstraint) remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) { if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) { // Device not affected by constraint. return @@ -501,9 +503,9 @@ func (m *matchAttributeConstraint) remove(requestName string, device *resourceap m.logger.V(7).Info("Device removed from constraint set", "device", deviceID, "numDevices", m.numDevices) } -func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attributeName resourceapi.FullyQualifiedName) *resourceapi.DeviceAttribute { +func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute { // Fully-qualified match? - if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName)]; ok { + if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok { return &attr } index := strings.Index(string(attributeName), "/") @@ -512,14 +514,14 @@ func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attribu return nil } - if string(attributeName[0:index]) != deviceID.Driver { + if string(attributeName[0:index]) != deviceID.Driver.String() { // Not an attribute of the driver and not found above, // so it is not available. return nil } // Domain matches the driver, so let's check just the ID. - if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName[index+1:])]; ok { + if attr, ok := device.Attributes[draapi.QualifiedName(attributeName[index+1:])]; ok { return &attr } @@ -559,7 +561,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { // For "all" devices we already know which ones we need. We // just need to check whether we can use them. deviceWithID := requestData.allDevices[r.deviceIndex] - success, _, err := alloc.allocateDevice(r, deviceWithID.device, deviceWithID.DeviceID, true) + success, _, err := alloc.allocateDevice(r, deviceWithID, true) if err != nil { return false, err } @@ -587,7 +589,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { deviceID := DeviceID{Driver: pool.Driver, Pool: pool.Pool, Device: slice.Spec.Devices[deviceIndex].Name} // Checking for "in use" is cheap and thus gets done first. - if !ptr.Deref(request.AdminAccess, false) && alloc.allocated[deviceID] { + if !ptr.Deref(request.AdminAccess, false) && (alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) { alloc.logger.V(7).Info("Device in use", "device", deviceID) continue } @@ -610,7 +612,12 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { } // Finally treat as allocated and move on to the next device. - allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) + device := deviceWithID{ + id: deviceID, + basic: slice.Spec.Devices[deviceIndex].Basic, + slice: slice, + } + allocated, deallocate, err := alloc.allocateDevice(r, device, false) if err != nil { return false, err } @@ -640,7 +647,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { } // isSelectable checks whether a device satisfies the request and class selectors. -func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.ResourceSlice, deviceIndex int) (bool, error) { +func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSlice, deviceIndex int) (bool, error) { // This is the only supported device type at the moment. device := slice.Spec.Devices[deviceIndex].Basic if device == nil { @@ -682,9 +689,9 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.Resour } -func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { +func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { - expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{}) + expr := alloc.celCache.GetOrCompile(selector.CEL.Expression) if expr.Error != nil { // Could happen if some future apiserver accepted some // future expression and then got downgraded. Normally @@ -697,7 +704,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error) } - matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver, Attributes: device.Attributes, Capacity: device.Capacity}) + // If this conversion turns out to be expensive, the CEL package could be converted + // to use unique strings. + var d resourceapi.BasicDevice + if err := draapi.Convert_api_BasicDevice_To_v1alpha3_BasicDevice(device, &d, nil); err != nil { + return false, fmt.Errorf("convert BasicDevice: %w", err) + } + matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity}) if class != nil { alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err) } else { @@ -727,28 +740,28 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas // as if that candidate had been allocated. If allocation cannot continue later // and must try something else, then the rollback function can be invoked to // restore the previous state. -func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) { +func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, must bool) (bool, func(), error) { claim := alloc.claimsToAllocate[r.claimIndex] request := &claim.Spec.Devices.Requests[r.requestIndex] adminAccess := ptr.Deref(request.AdminAccess, false) - if !adminAccess && alloc.allocated[deviceID] { - alloc.logger.V(7).Info("Device in use", "device", deviceID) + if !adminAccess && (alloc.allocatedDevices.Has(device.id) || alloc.allocatingDevices[device.id]) { + alloc.logger.V(7).Info("Device in use", "device", device.id) return false, nil, nil } // It's available. Now check constraints. for i, constraint := range alloc.constraints[r.claimIndex] { - added := constraint.add(request.Name, device, deviceID) + added := constraint.add(request.Name, device.basic, device.id) if !added { if must { // It does not make sense to declare a claim where a constraint prevents getting // all devices. Treat this as an error. - return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, deviceID) + return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, device.id) } // Roll back for all previous constraints before we return. for e := 0; e < i; e++ { - alloc.constraints[r.claimIndex][e].remove(request.Name, device, deviceID) + alloc.constraints[r.claimIndex][e].remove(request.Name, device.basic, device.id) } return false, nil, nil } @@ -756,50 +769,46 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.Basi // All constraints satisfied. Mark as in use (unless we do admin access) // and record the result. - alloc.logger.V(7).Info("Device allocated", "device", deviceID) + alloc.logger.V(7).Info("Device allocated", "device", device.id) if !adminAccess { - alloc.allocated[deviceID] = true + alloc.allocatingDevices[device.id] = true } - result := resourceapi.DeviceRequestAllocationResult{ - Request: request.Name, - Driver: deviceID.Driver, - Pool: deviceID.Pool, - Device: deviceID.Device, + result := internalDeviceResult{ + request: request.Name, + id: device.id, + slice: device.slice, } if adminAccess { - result.AdminAccess = &adminAccess + result.adminAccess = &adminAccess } - previousNumResults := len(alloc.result[r.claimIndex].Devices.Results) - alloc.result[r.claimIndex].Devices.Results = append(alloc.result[r.claimIndex].Devices.Results, result) + previousNumResults := len(alloc.result[r.claimIndex].devices) + alloc.result[r.claimIndex].devices = append(alloc.result[r.claimIndex].devices, result) return true, func() { for _, constraint := range alloc.constraints[r.claimIndex] { - constraint.remove(request.Name, device, deviceID) + constraint.remove(request.Name, device.basic, device.id) } if !adminAccess { - alloc.allocated[deviceID] = false + alloc.allocatingDevices[device.id] = false } // Truncate, but keep the underlying slice. - alloc.result[r.claimIndex].Devices.Results = alloc.result[r.claimIndex].Devices.Results[:previousNumResults] - alloc.logger.V(7).Info("Device deallocated", "device", deviceID) + alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults] + alloc.logger.V(7).Info("Device deallocated", "device", device.id) }, nil } // createNodeSelector constructs a node selector for the allocation, if needed, // otherwise it returns nil. -func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationResult) (*v1.NodeSelector, error) { +func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) { // Selector with one term. That term gets extended with additional // requirements from the different devices. nodeSelector := &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{{}}, } - for _, deviceAllocation := range allocation.Devices.Results { - slice := alloc.findSlice(deviceAllocation) - if slice == nil { - return nil, fmt.Errorf("internal error: device %+v not found in pools", deviceAllocation) - } - if slice.Spec.NodeName != "" { + for i := range result { + slice := result[i].slice + if slice.Spec.NodeName != draapi.NullUniqueString { // At least one device is local to one node. This // restricts the allocation to that node. return &v1.NodeSelector{ @@ -807,7 +816,7 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes MatchFields: []v1.NodeSelectorRequirement{{ Key: "metadata.name", Operator: v1.NodeSelectorOpIn, - Values: []string{slice.Spec.NodeName}, + Values: []string{slice.Spec.NodeName.String()}, }}, }}, }, nil @@ -836,23 +845,6 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes return nil, nil } -func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceSlice { - for _, pool := range alloc.pools { - if pool.Driver != deviceAllocation.Driver || - pool.Pool != deviceAllocation.Pool { - continue - } - for _, slice := range pool.Slices { - for _, device := range slice.Spec.Devices { - if device.Name == deviceAllocation.Device { - return slice - } - } - } - } - return nil -} - func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) { for _, requirement := range from { if !containsNodeSelectorRequirement(*to, requirement) { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index f16c38c9cee..8684108cdb2 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -20,6 +20,7 @@ import ( "errors" "flag" "fmt" + "slices" "testing" "github.com/onsi/gomega" @@ -35,6 +36,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -182,17 +185,6 @@ func claimWithDeviceConfig(name, request, class, driver, attribute string) *reso return claim } -// generate allocated ResourceClaim object -func allocatedClaim(name, request, class string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceClaim { - claim := claim(name, request, class) - claim.Status.Allocation = &resourceapi.AllocationResult{ - Devices: resourceapi.DeviceAllocationResult{ - Results: results, - }, - } - return claim -} - // generate a Device object with the given name, capacity and attributes. func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) resourceapi.Device { return resourceapi.Device{ @@ -276,13 +268,13 @@ func localNodeSelector(nodeName string) *v1.NodeSelector { // allocationResult returns a matcher for one AllocationResult pointer with a list of // embedded device allocation results. The order of those results doesn't matter. func allocationResult(selector *v1.NodeSelector, results ...resourceapi.DeviceRequestAllocationResult) types.GomegaMatcher { - return gstruct.PointTo(gstruct.MatchFields(0, gstruct.Fields{ + return gstruct.MatchFields(0, gstruct.Fields{ "Devices": gstruct.MatchFields(0, gstruct.Fields{ "Results": gomega.ConsistOf(results), // Order is irrelevant. "Config": gomega.BeNil(), }), "NodeSelector": matchNodeSelector(selector), - })) + }) } // matchNodeSelector returns a matcher for a node selector. The order @@ -325,8 +317,8 @@ func matchNodeSelectorRequirement(requirement v1.NodeSelectorRequirement) types. }) } -func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.AllocationResult { - return &resourceapi.AllocationResult{ +func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) resourceapi.AllocationResult { + return resourceapi.AllocationResult{ Devices: resourceapi.DeviceAllocationResult{ Results: results, Config: []resourceapi.DeviceAllocationConfiguration{ @@ -353,16 +345,16 @@ func sliceWithOneDevice(name string, nodeSelection any, pool, driver string) *re } func TestAllocator(t *testing.T) { - nonExistentAttribute := resourceapi.FullyQualifiedName("NonExistentAttribute") - boolAttribute := resourceapi.FullyQualifiedName("boolAttribute") - stringAttribute := resourceapi.FullyQualifiedName("stringAttribute") - versionAttribute := resourceapi.FullyQualifiedName("driverVersion") - intAttribute := resourceapi.FullyQualifiedName("numa") + nonExistentAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "NonExistentAttribute") + boolAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "boolAttribute") + stringAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "stringAttribute") + versionAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "driverVersion") + intAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "numa") testcases := map[string]struct { adminAccess bool claimsToAllocate []*resourceapi.ResourceClaim - allocatedClaims []*resourceapi.ResourceClaim + allocatedDevices []DeviceID classes []*resourceapi.DeviceClass slices []*resourceapi.ResourceSlice node *v1.Node @@ -943,12 +935,10 @@ func TestAllocator(t *testing.T) { }, "already-allocated-devices": { claimsToAllocate: objects(claim(claim0, req0, classA)), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, false), - deviceAllocationResult(req1, driverA, pool1, device2, false), - ), - ), + allocatedDevices: []DeviceID{ + MakeDeviceID(driverA, pool1, device1), + MakeDeviceID(driverA, pool1, device2), + }, classes: objects(class(classA, driverA)), slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), node: node(node1, region1), @@ -976,12 +966,10 @@ func TestAllocator(t *testing.T) { c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true) return []*resourceapi.ResourceClaim{c} }(), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, false), - deviceAllocationResult(req1, driverA, pool1, device2, false), - ), - ), + allocatedDevices: []DeviceID{ + MakeDeviceID(driverA, pool1, device1), + MakeDeviceID(driverA, pool1, device2), + }, classes: objects(class(classA, driverA)), slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), node: node(node1, region1), @@ -990,22 +978,6 @@ func TestAllocator(t *testing.T) { allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, true)), }, }, - "already-allocated-for-admin-access": { - claimsToAllocate: objects(claim(claim0, req0, classA)), - allocatedClaims: objects( - allocatedClaim(claim0, req0, classA, - deviceAllocationResult(req0, driverA, pool1, device1, true), - deviceAllocationResult(req1, driverA, pool1, device2, true), - ), - ), - classes: objects(class(classA, driverA)), - slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), - node: node(node1, region1), - - expectResults: []any{ - allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, false)), - }, - }, "with-constraint": { claimsToAllocate: objects(claimWithRequests( claim0, @@ -1397,23 +1369,15 @@ func TestAllocator(t *testing.T) { // Listing objects is deterministic and returns them in the same // order as in the test case. That makes the allocation result // also deterministic. - var allocated, toAllocate claimLister var classLister informerLister[resourceapi.DeviceClass] - var sliceLister informerLister[resourceapi.ResourceSlice] - for _, claim := range tc.claimsToAllocate { - toAllocate.claims = append(toAllocate.claims, claim.DeepCopy()) - } - for _, claim := range tc.allocatedClaims { - allocated.claims = append(allocated.claims, claim.DeepCopy()) - } - for _, slice := range tc.slices { - sliceLister.objs = append(sliceLister.objs, slice.DeepCopy()) - } for _, class := range tc.classes { classLister.objs = append(classLister.objs, class.DeepCopy()) } + claimsToAllocate := slices.Clone(tc.claimsToAllocate) + allocatedDevices := slices.Clone(tc.allocatedDevices) + slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, toAllocate.claims, allocated, classLister, sliceLister) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) @@ -1425,23 +1389,14 @@ func TestAllocator(t *testing.T) { g.Expect(results).To(gomega.ConsistOf(tc.expectResults...)) // Objects that the allocator had access to should not have been modified. - g.Expect(toAllocate.claims).To(gomega.HaveExactElements(tc.claimsToAllocate)) - g.Expect(allocated.claims).To(gomega.HaveExactElements(tc.allocatedClaims)) - g.Expect(sliceLister.objs).To(gomega.ConsistOf(tc.slices)) + g.Expect(claimsToAllocate).To(gomega.HaveExactElements(tc.claimsToAllocate)) + g.Expect(allocatedDevices).To(gomega.HaveExactElements(tc.allocatedDevices)) + g.Expect(slices).To(gomega.ConsistOf(tc.slices)) g.Expect(classLister.objs).To(gomega.ConsistOf(tc.classes)) }) } } -type claimLister struct { - claims []*resourceapi.ResourceClaim - err error -} - -func (l claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - return l.claims, l.err -} - type informerLister[T any] struct { objs []*T err error diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 64bb46c7b8e..37cabaddc65 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -22,10 +22,9 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + draapi "k8s.io/dynamic-resource-allocation/api" ) // GatherPools collects information about all resource pools which provide @@ -34,31 +33,35 @@ import ( // Out-dated slices are silently ignored. Pools may be incomplete (not all // required slices available) or invalid (for example, device names not unique). // Both is recorded in the result. -func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) { +func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node) ([]*Pool, error) { pools := make(map[PoolID]*Pool) - - // TODO (future): use a custom lister interface and implement it with - // and indexer on the node name field. Then here we can ask for only - // slices with the right node name and those with no node name. - slices, err := sliceLister.List(labels.Everything()) - if err != nil { - return nil, fmt.Errorf("list resource slices: %w", err) + nodeName := "" + if node != nil { + nodeName = node.Name } + for _, slice := range slices { switch { case slice.Spec.NodeName != "": - if slice.Spec.NodeName == node.Name { - addSlice(pools, slice) + if slice.Spec.NodeName == nodeName { + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err) + } } case slice.Spec.AllNodes: - addSlice(pools, slice) + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err) + } case slice.Spec.NodeSelector != nil: + // TODO: move conversion into api. selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err) } if selector.Match(node) { - addSlice(pools, slice) + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err) + } } default: // Nothing known was set. This must be some future, unknown extension, @@ -84,22 +87,27 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL return result, nil } -func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { +func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error { + var slice draapi.ResourceSlice + if err := draapi.Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil { + return fmt.Errorf("convert ResourceSlice: %w", err) + } + id := PoolID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name} pool := pools[id] if pool == nil { // New pool. pool = &Pool{ PoolID: id, - Slices: []*resourceapi.ResourceSlice{slice}, + Slices: []*draapi.ResourceSlice{&slice}, } pools[id] = pool - return + return nil } if slice.Spec.Pool.Generation < pool.Slices[0].Spec.Pool.Generation { // Out-dated. - return + return nil } if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation { @@ -108,11 +116,12 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { } // Add to pool. - pool.Slices = append(pool.Slices, slice) + pool.Slices = append(pool.Slices, &slice) + return nil } func poolIsInvalid(pool *Pool) (bool, string) { - devices := sets.New[string]() + devices := sets.New[draapi.UniqueString]() for _, slice := range pool.Slices { for _, device := range slice.Spec.Devices { if devices.Has(device.Name) { @@ -129,21 +138,29 @@ type Pool struct { IsIncomplete bool IsInvalid bool InvalidReason string - Slices []*resourceapi.ResourceSlice + Slices []*draapi.ResourceSlice } type PoolID struct { - Driver, Pool string + Driver, Pool draapi.UniqueString } func (p PoolID) String() string { - return p.Driver + "/" + p.Pool + return p.Driver.String() + "/" + p.Pool.String() } type DeviceID struct { - Driver, Pool, Device string + Driver, Pool, Device draapi.UniqueString } func (d DeviceID) String() string { - return d.Driver + "/" + d.Pool + "/" + d.Device + return d.Driver.String() + "/" + d.Pool.String() + "/" + d.Device.String() +} + +func MakeDeviceID(driver, pool, device string) DeviceID { + return DeviceID{ + Driver: draapi.MakeUniqueString(driver), + Pool: draapi.MakeUniqueString(pool), + Device: draapi.MakeUniqueString(device), + } } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index d85e624309b..a4cb1cfe905 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -30,9 +30,11 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" + "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" @@ -277,7 +279,6 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister() claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) - claimLister := claimLister{cache: claimCache} informerFactory.Start(tCtx.Done()) defer func() { tCtx.Cancel("allocResourceClaimsOp.run is shutting down") @@ -291,10 +292,13 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { reflect.TypeOf(&v1.Node{}): true, } require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") + celCache := cel.NewCache(10) // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) tCtx.ExpectNoError(err, "list nodes") + slices, err := sliceLister.List(labels.Everything()) + tCtx.ExpectNoError(err, "list slices") // Allocate one claim at a time, picking nodes randomly. Each // allocation is stored immediately, using the claim cache to avoid @@ -306,7 +310,19 @@ claims: continue } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister) + objs := claimCache.List(nil) + allocatedDevices := sets.New[structured.DeviceID]() + for _, obj := range objs { + claim := obj.(*resourceapi.ResourceClaim) + if claim.Status.Allocation == nil { + continue + } + for _, result := range claim.Status.Allocation.Devices.Results { + allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) + } + } + + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) { @@ -317,7 +333,7 @@ claims: tCtx.ExpectNoError(err, "allocate claim") if result != nil { claim = claim.DeepCopy() - claim.Status.Allocation = result[0] + claim.Status.Allocation = &result[0] claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) tCtx.ExpectNoError(err, "update claim status with allocation") tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") @@ -327,19 +343,3 @@ claims: tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) } } - -type claimLister struct { - cache *assumecache.AssumeCache -} - -func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) { - objs := c.cache.List(nil) - allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs)) - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) - if claim.Status.Allocation != nil { - allocatedClaims = append(allocatedClaims, claim) - } - } - return allocatedClaims, nil -}