Merge pull request #127277 from pohly/dra-structured-performance

kube-scheduler: enhance performance for DRA structured parameters
This commit is contained in:
Kubernetes Prow Robot 2024-11-05 10:05:29 +00:00 committed by GitHub
commit c69f150008
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1084 additions and 274 deletions

View File

@ -0,0 +1,175 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"sync"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/ptr"
)
// foreachAllocatedDevice invokes the provided callback for each
// device in the claim's allocation result which was allocated
// exclusively for the claim.
//
// Devices allocated with admin access can be shared with other
// claims and are skipped without invoking the callback.
//
// foreachAllocatedDevice does nothing if the claim is not allocated.
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) {
if claim.Status.Allocation == nil {
return
}
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
// None of the users of this helper need to abort iterating,
// therefore it's not supported as it only would add overhead.
cb(deviceID)
}
}
// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices.
// This is cheaper than repeatedly calling List, making strings unique, and building the set
// each time PreFilter is called.
//
// All methods are thread-safe. Get returns a cloned set.
type allocatedDevices struct {
logger klog.Logger
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
}
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
return &allocatedDevices{
logger: logger,
ids: sets.New[structured.DeviceID](),
}
}
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.ids.Clone()
}
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: a.onAdd,
UpdateFunc: a.onUpdate,
DeleteFunc: a.onDelete,
}
}
func (a *allocatedDevices) onAdd(obj any) {
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onAdd")
return
}
if claim.Status.Allocation != nil {
a.addDevices(claim)
}
}
func (a *allocatedDevices) onUpdate(oldObj, newObj any) {
originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate")
return
}
switch {
case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil:
a.addDevices(modifiedClaim)
case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil:
a.removeDevices(originalClaim)
default:
// Nothing to do. Either both nil or both non-nil, in which case the content
// also must be the same (immutable!).
}
}
func (a *allocatedDevices) onDelete(obj any) {
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onDelete")
return
}
a.removeDevices(claim)
}
func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
if claim.Status.Allocation == nil {
return
}
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
})
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Insert(deviceID)
}
}
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
if claim.Status.Allocation == nil {
return
}
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
})
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Delete(deviceID)
}
}

View File

@ -30,6 +30,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@ -37,6 +38,7 @@ import (
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
@ -85,7 +87,7 @@ type stateData struct {
informationsForClaim []informationForClaim
// nodeAllocations caches the result of Filter for the nodes.
nodeAllocations map[string][]*resourceapi.AllocationResult
nodeAllocations map[string][]resourceapi.AllocationResult
}
func (d *stateData) Clone() framework.StateData {
@ -106,10 +108,12 @@ type DynamicResources struct {
enableAdminAccess bool
enableSchedulingQueueHint bool
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
celCache *cel.Cache
allocatedDevices *allocatedDevices
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
@ -174,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &DynamicResources{}, nil
}
logger := klog.FromContext(ctx)
pl := &DynamicResources{
enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess,
@ -184,8 +189,19 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
claimAssumeCache: fh.ResourceClaimCache(),
// This is a LRU cache for compiled CEL expressions. The most
// recent 10 of them get reused across different scheduling
// cycles.
celCache: cel.NewCache(10),
allocatedDevices: newAllocatedDevices(logger),
}
// Reacting to events is more efficient than iterating over the list
// repeatedly in PreFilter.
pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers())
return pl, nil
}
@ -527,39 +543,41 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// expensive, we may have to maintain and update state more
// persistently.
//
// Claims are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight.
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}, pl.classLister, pl.sliceLister)
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allAllocatedDevices := pl.listAllAllocatedDevices(logger)
slices, err := pl.sliceLister.List(labels.Everything())
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache)
if err != nil {
return nil, statusError(logger, err)
}
s.allocator = allocator
s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult)
s.nodeAllocations = make(map[string][]resourceapi.AllocationResult)
}
s.claims = claims
return nil, nil
}
type claimListerForAssumeCache struct {
assumeCache *assumecache.AssumeCache
inFlightAllocations *sync.Map
}
func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := pl.allocatedDevices.Get()
func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
// Probably not worth adding an index for?
objs := cl.assumeCache.List(nil)
allocated := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok {
claim = obj.(*resourceapi.ResourceClaim)
}
if claim.Status.Allocation != nil {
allocated = append(allocated, claim)
}
}
return allocated, nil
// Whatever is in flight also has to be checked.
pl.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
})
return true
})
return allocated
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
@ -615,7 +633,7 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState
}
// Use allocator to check the node and cache the result in case that the node is picked.
var allocations []*resourceapi.AllocationResult
var allocations []resourceapi.AllocationResult
if state.allocator != nil {
allocCtx := ctx
if loggerV := logger.V(5); loggerV.Enabled() {
@ -763,7 +781,7 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
if index < 0 {
return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name))
}
allocation := allocations[i]
allocation := &allocations[i]
state.informationsForClaim[index].allocation = allocation
// Strictly speaking, we don't need to store the full modified object.

View File

@ -0,0 +1,47 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"unique"
conversion "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
)
var (
localSchemeBuilder runtime.SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)
func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error {
if *in == NullUniqueString {
*out = ""
return nil
}
*out = in.String()
return nil
}
func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error {
if *in == "" {
*out = NullUniqueString
return nil
}
*out = UniqueString(unique.Make(*in))
return nil
}

View File

@ -0,0 +1,22 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package api contains a variant of the API where strings are unique. These
// unique strings are faster to compare and more efficient when used as key in
// a map.
//
// +k8s:conversion-gen=k8s.io/api/resource/v1alpha3
package api

View File

@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type ResourceSlice struct {
metav1.TypeMeta
metav1.ObjectMeta
Spec ResourceSliceSpec
}
type ResourceSliceSpec struct {
Driver UniqueString
Pool ResourcePool
NodeName UniqueString
NodeSelector *v1.NodeSelector
AllNodes bool
Devices []Device
}
type ResourcePool struct {
Name UniqueString
Generation int64
ResourceSliceCount int64
}
type Device struct {
Name UniqueString
Basic *BasicDevice
}
type BasicDevice struct {
Attributes map[QualifiedName]DeviceAttribute
Capacity map[QualifiedName]resource.Quantity
}
type QualifiedName string
type FullyQualifiedName string
type DeviceAttribute struct {
IntValue *int64
BoolValue *bool
StringValue *string
VersionValue *string
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"unique"
)
// NullUniqueString is a UniqueString which contains no string.
var NullUniqueString UniqueString
// UniqueString is a wrapper around [unique.Handle[string]].
type UniqueString unique.Handle[string]
// Returns the string that is stored in the UniqueString.
// If the UniqueString is null, the empty string is returned.
func (us UniqueString) String() string {
if us == NullUniqueString {
return ""
}
return unique.Handle[string](us).Value()
}
// MakeUniqueString constructs a new unique string.
func MakeUniqueString(str string) UniqueString {
return UniqueString(unique.Make(str))
}

View File

@ -0,0 +1,302 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by conversion-gen. DO NOT EDIT.
package api
import (
unsafe "unsafe"
v1 "k8s.io/api/core/v1"
v1alpha3 "k8s.io/api/resource/v1alpha3"
resource "k8s.io/apimachinery/pkg/api/resource"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
)
func init() {
localSchemeBuilder.Register(RegisterConversions)
}
// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {
if err := s.AddGeneratedConversionFunc((*BasicDevice)(nil), (*v1alpha3.BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_BasicDevice_To_v1alpha3_BasicDevice(a.(*BasicDevice), b.(*v1alpha3.BasicDevice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.BasicDevice)(nil), (*BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_BasicDevice_To_api_BasicDevice(a.(*v1alpha3.BasicDevice), b.(*BasicDevice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*Device)(nil), (*v1alpha3.Device)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_Device_To_v1alpha3_Device(a.(*Device), b.(*v1alpha3.Device), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.Device)(nil), (*Device)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_Device_To_api_Device(a.(*v1alpha3.Device), b.(*Device), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*DeviceAttribute)(nil), (*v1alpha3.DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(a.(*DeviceAttribute), b.(*v1alpha3.DeviceAttribute), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.DeviceAttribute)(nil), (*DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(a.(*v1alpha3.DeviceAttribute), b.(*DeviceAttribute), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourcePool)(nil), (*v1alpha3.ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourcePool_To_v1alpha3_ResourcePool(a.(*ResourcePool), b.(*v1alpha3.ResourcePool), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourcePool)(nil), (*ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourcePool_To_api_ResourcePool(a.(*v1alpha3.ResourcePool), b.(*ResourcePool), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourceSlice)(nil), (*v1alpha3.ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(a.(*ResourceSlice), b.(*v1alpha3.ResourceSlice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSlice)(nil), (*ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(a.(*v1alpha3.ResourceSlice), b.(*ResourceSlice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourceSliceSpec)(nil), (*v1alpha3.ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(a.(*ResourceSliceSpec), b.(*v1alpha3.ResourceSliceSpec), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSliceSpec)(nil), (*ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(a.(*v1alpha3.ResourceSliceSpec), b.(*ResourceSliceSpec), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*UniqueString)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_UniqueString_To_string(a.(*UniqueString), b.(*string), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*string)(nil), (*UniqueString)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_string_To_api_UniqueString(a.(*string), b.(*UniqueString), scope)
}); err != nil {
return err
}
return nil
}
func autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error {
out.Attributes = *(*map[v1alpha3.QualifiedName]v1alpha3.DeviceAttribute)(unsafe.Pointer(&in.Attributes))
out.Capacity = *(*map[v1alpha3.QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity))
return nil
}
// Convert_api_BasicDevice_To_v1alpha3_BasicDevice is an autogenerated conversion function.
func Convert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error {
return autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in, out, s)
}
func autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error {
out.Attributes = *(*map[QualifiedName]DeviceAttribute)(unsafe.Pointer(&in.Attributes))
out.Capacity = *(*map[QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity))
return nil
}
// Convert_v1alpha3_BasicDevice_To_api_BasicDevice is an autogenerated conversion function.
func Convert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error {
return autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in, out, s)
}
func autoConvert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil {
return err
}
out.Basic = (*v1alpha3.BasicDevice)(unsafe.Pointer(in.Basic))
return nil
}
// Convert_api_Device_To_v1alpha3_Device is an autogenerated conversion function.
func Convert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error {
return autoConvert_api_Device_To_v1alpha3_Device(in, out, s)
}
func autoConvert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil {
return err
}
out.Basic = (*BasicDevice)(unsafe.Pointer(in.Basic))
return nil
}
// Convert_v1alpha3_Device_To_api_Device is an autogenerated conversion function.
func Convert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error {
return autoConvert_v1alpha3_Device_To_api_Device(in, out, s)
}
func autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error {
out.IntValue = (*int64)(unsafe.Pointer(in.IntValue))
out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue))
out.StringValue = (*string)(unsafe.Pointer(in.StringValue))
out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue))
return nil
}
// Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute is an autogenerated conversion function.
func Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error {
return autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in, out, s)
}
func autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error {
out.IntValue = (*int64)(unsafe.Pointer(in.IntValue))
out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue))
out.StringValue = (*string)(unsafe.Pointer(in.StringValue))
out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue))
return nil
}
// Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute is an autogenerated conversion function.
func Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error {
return autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in, out, s)
}
func autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil {
return err
}
out.Generation = in.Generation
out.ResourceSliceCount = in.ResourceSliceCount
return nil
}
// Convert_api_ResourcePool_To_v1alpha3_ResourcePool is an autogenerated conversion function.
func Convert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error {
return autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in, out, s)
}
func autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil {
return err
}
out.Generation = in.Generation
out.ResourceSliceCount = in.ResourceSliceCount
return nil
}
// Convert_v1alpha3_ResourcePool_To_api_ResourcePool is an autogenerated conversion function.
func Convert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in, out, s)
}
func autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil {
return err
}
return nil
}
// Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice is an autogenerated conversion function.
func Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error {
return autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in, out, s)
}
func autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil {
return err
}
return nil
}
// Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice is an autogenerated conversion function.
func Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in, out, s)
}
func autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Driver, &out.Driver, s); err != nil {
return err
}
if err := Convert_api_ResourcePool_To_v1alpha3_ResourcePool(&in.Pool, &out.Pool, s); err != nil {
return err
}
if err := Convert_api_UniqueString_To_string(&in.NodeName, &out.NodeName, s); err != nil {
return err
}
out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector))
out.AllNodes = in.AllNodes
if in.Devices != nil {
in, out := &in.Devices, &out.Devices
*out = make([]v1alpha3.Device, len(*in))
for i := range *in {
if err := Convert_api_Device_To_v1alpha3_Device(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.Devices = nil
}
return nil
}
// Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec is an autogenerated conversion function.
func Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error {
return autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in, out, s)
}
func autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Driver, &out.Driver, s); err != nil {
return err
}
if err := Convert_v1alpha3_ResourcePool_To_api_ResourcePool(&in.Pool, &out.Pool, s); err != nil {
return err
}
if err := Convert_string_To_api_UniqueString(&in.NodeName, &out.NodeName, s); err != nil {
return err
}
out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector))
out.AllNodes = in.AllNodes
if in.Devices != nil {
in, out := &in.Devices, &out.Devices
*out = make([]Device, len(*in))
for i := range *in {
if err := Convert_v1alpha3_Device_To_api_Device(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.Devices = nil
}
return nil
}
// Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec is an autogenerated conversion function.
func Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in, out, s)
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"sync"
"k8s.io/utils/keymutex"
"k8s.io/utils/lru"
)
// Cache is a thread-safe LRU cache for a compiled CEL expression.
type Cache struct {
compileMutex keymutex.KeyMutex
cacheMutex sync.RWMutex
cache *lru.Cache
}
// NewCache creates a cache. The maximum number of entries determines
// how many entries are cached at most before dropping the oldest
// entry.
func NewCache(maxCacheEntries int) *Cache {
return &Cache{
compileMutex: keymutex.NewHashed(0),
cache: lru.New(maxCacheEntries),
}
}
// GetOrCompile checks whether the cache already has a compilation result
// and returns that if available. Otherwise it compiles, stores successful
// results and returns the new result.
func (c *Cache) GetOrCompile(expression string) CompilationResult {
// Compiling a CEL expression is expensive enough that it is cheaper
// to lock a mutex than doing it several times in parallel.
c.compileMutex.LockKey(expression)
//nolint:errcheck // Only returns an error for unknown keys, which isn't the case here.
defer c.compileMutex.UnlockKey(expression)
cached := c.get(expression)
if cached != nil {
return *cached
}
expr := GetCompiler().CompileCELExpression(expression, Options{})
if expr.Error == nil {
c.add(expression, &expr)
}
return expr
}
func (c *Cache) add(expression string, expr *CompilationResult) {
c.cacheMutex.Lock()
defer c.cacheMutex.Unlock()
c.cache.Add(expression, expr)
}
func (c *Cache) get(expression string) *CompilationResult {
c.cacheMutex.RLock()
defer c.cacheMutex.RUnlock()
expr, found := c.cache.Get(expression)
if !found {
return nil
}
return expr.(*CompilationResult)
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cel
import (
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCacheSemantic(t *testing.T) {
// Cache two entries.
//
// Entries are comparable structs with pointers inside. Each
// compilation leads to different pointers, so the entries can be
// compared by value to figure out whether an entry was cached or
// compiled anew.
cache := NewCache(2)
// Successful compilations get cached.
resultTrue := cache.GetOrCompile("true")
require.Nil(t, resultTrue.Error)
resultTrueAgain := cache.GetOrCompile("true")
if resultTrue != resultTrueAgain {
t.Fatal("result of compiling `true` should have been cached")
}
// Unsuccessful ones don't.
resultFailed := cache.GetOrCompile("no-such-variable")
require.NotNil(t, resultFailed.Error)
resultFailedAgain := cache.GetOrCompile("no-such-variable")
if resultFailed == resultFailedAgain {
t.Fatal("result of compiling `no-such-variable` should not have been cached")
}
// The cache can hold a second result.
resultFalse := cache.GetOrCompile("false")
require.Nil(t, resultFalse.Error)
resultFalseAgain := cache.GetOrCompile("false")
if resultFalse != resultFalseAgain {
t.Fatal("result of compiling `false` should have been cached")
}
resultTrueAgain = cache.GetOrCompile("true")
if resultTrue != resultTrueAgain {
t.Fatal("result of compiling `true` should still have been cached")
}
// A third result pushes out the least recently used one.
resultOther := cache.GetOrCompile("false && true")
require.Nil(t, resultFalse.Error)
resultOtherAgain := cache.GetOrCompile("false && true")
if resultOther != resultOtherAgain {
t.Fatal("result of compiling `false && true` should have been cached")
}
resultFalseAgain = cache.GetOrCompile("false")
if resultFalse == resultFalseAgain {
t.Fatal("result of compiling `false` should have been evicted from the cache")
}
}
func TestCacheConcurrency(t *testing.T) {
// There's no guarantee that concurrent use of the cache would really
// trigger the race detector in `go test -race`, but in practice
// it does when not using the cacheMutex.
//
// The compileMutex ony affects performance and thus cannot be tested
// without benchmarking.
numWorkers := 10
cache := NewCache(2)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func(i int) {
defer wg.Done()
result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i))
assert.Nil(t, result.Error)
}(i)
}
wg.Wait()
}

View File

@ -27,18 +27,12 @@ import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
// ClaimLister returns a subset of the claims that a
// resourcelisters.ResourceClaimLister would return.
type ClaimLister interface {
// ListAllAllocated returns only claims which are allocated.
ListAllAllocated() ([]*resourceapi.ResourceClaim, error)
}
// Allocator calculates how to allocate a set of unallocated claims which use
// structured parameters.
//
@ -48,26 +42,31 @@ type ClaimLister interface {
type Allocator struct {
adminAccessEnabled bool
claimsToAllocate []*resourceapi.ResourceClaim
claimLister ClaimLister
allocatedDevices sets.Set[DeviceID]
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
slices []*resourceapi.ResourceSlice
celCache *cel.Cache
}
// NewAllocator returns an allocator for a certain set of claims or an error if
// some problem was detected which makes it impossible to allocate claims.
//
// The returned Allocator can be used multiple times and is thread-safe.
func NewAllocator(ctx context.Context,
adminAccessEnabled bool,
claimsToAllocate []*resourceapi.ResourceClaim,
claimLister ClaimLister,
allocatedDevices sets.Set[DeviceID],
classLister resourcelisters.DeviceClassLister,
sliceLister resourcelisters.ResourceSliceLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (*Allocator, error) {
return &Allocator{
adminAccessEnabled: adminAccessEnabled,
claimsToAllocate: claimsToAllocate,
claimLister: claimLister,
allocatedDevices: allocatedDevices,
classLister: classLister,
sliceLister: sliceLister,
slices: slices,
celCache: celCache,
}, nil
}
@ -99,7 +98,7 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim {
// additional value. A name can also be useful because log messages do not
// have a common prefix. V(5) is used for one-time log entries, V(6) for important
// progress reports, and V(7) for detailed debug output.
func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []*resourceapi.AllocationResult, finalErr error) {
func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []resourceapi.AllocationResult, finalErr error) {
alloc := &allocator{
Allocator: a,
ctx: ctx, // all methods share the same a and thus ctx
@ -107,14 +106,13 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
deviceMatchesRequest: make(map[matchKey]bool),
constraints: make([][]constraint, len(a.claimsToAllocate)),
requestData: make(map[requestIndices]requestData),
allocated: make(map[DeviceID]bool),
result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)),
result: make([]internalAllocationResult, len(a.claimsToAllocate)),
}
alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate))
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
// First determine all eligible pools.
pools, err := GatherPools(ctx, alloc.sliceLister, node)
pools, err := GatherPools(ctx, alloc.slices, node)
if err != nil {
return nil, fmt.Errorf("gather pool information: %w", err)
}
@ -144,8 +142,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// and their requests. For each claim we determine how many devices
// need to be allocated. If not all can be stored in the result, the
// claim cannot be allocated.
numDevicesTotal := 0
for claimIndex, claim := range alloc.claimsToAllocate {
numDevices := 0
numDevicesPerClaim := 0
// If we have any any request that wants "all" devices, we need to
// figure out how much "all" is. If some pool is incomplete, we stop
@ -209,7 +208,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, err
}
if selectable {
requestData.allDevices = append(requestData.allDevices, deviceWithID{device: slice.Spec.Devices[deviceIndex].Basic, DeviceID: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}})
device := deviceWithID{
id: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name},
basic: slice.Spec.Devices[deviceIndex].Basic,
slice: slice,
}
requestData.allDevices = append(requestData.allDevices, device)
}
}
}
@ -220,39 +224,36 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, fmt.Errorf("claim %s, request %s: unsupported count mode %s", klog.KObj(claim), request.Name, request.AllocationMode)
}
alloc.requestData[requestKey] = requestData
numDevices += requestData.numDevices
numDevicesPerClaim += requestData.numDevices
}
alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices)
alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevicesPerClaim)
// Check that we don't end up with too many results.
if numDevices > resourceapi.AllocationResultsMaxSize {
return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevices, resourceapi.AllocationResultsMaxSize)
if numDevicesPerClaim > resourceapi.AllocationResultsMaxSize {
return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevicesPerClaim, resourceapi.AllocationResultsMaxSize)
}
// If we don't, then we can pre-allocate the result slices for
// appending the actual results later.
alloc.result[claimIndex] = &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevices),
},
}
alloc.result[claimIndex].devices = make([]internalDeviceResult, 0, numDevicesPerClaim)
// Constraints are assumed to be monotonic: once a constraint returns
// false, adding more devices will not cause it to return true. This
// allows the search to stop early once a constraint returns false.
var constraints = make([]constraint, len(claim.Spec.Devices.Constraints))
constraints := make([]constraint, len(claim.Spec.Devices.Constraints))
for i, constraint := range claim.Spec.Devices.Constraints {
switch {
case constraint.MatchAttribute != nil:
matchAttribute := draapi.FullyQualifiedName(*constraint.MatchAttribute)
logger := alloc.logger
if loggerV := alloc.logger.V(6); loggerV.Enabled() {
logger = klog.LoggerWithName(logger, "matchAttributeConstraint")
logger = klog.LoggerWithValues(logger, "matchAttribute", *constraint.MatchAttribute)
logger = klog.LoggerWithValues(logger, "matchAttribute", matchAttribute)
}
m := &matchAttributeConstraint{
logger: logger,
requestNames: sets.New(constraint.Requests...),
attributeName: *constraint.MatchAttribute,
attributeName: matchAttribute,
}
constraints[i] = m
default:
@ -261,6 +262,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
}
}
alloc.constraints[claimIndex] = constraints
numDevicesTotal += numDevicesPerClaim
}
// Selecting a device for a request is independent of what has been
@ -270,34 +272,10 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// the Allocate call and can be compared in Go.
alloc.deviceMatchesRequest = make(map[matchKey]bool)
// Some of the existing devices are probably already allocated by
// claims...
claims, err := alloc.claimLister.ListAllAllocated()
numAllocated := 0
if err != nil {
return nil, fmt.Errorf("list allocated claims: %w", err)
}
for _, claim := range claims {
// Sanity check..
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Ignore, it's not considered allocated.
continue
}
deviceID := DeviceID{Driver: result.Driver, Pool: result.Pool, Device: result.Device}
alloc.allocated[deviceID] = true
numAllocated++
}
}
alloc.logger.V(6).Info("Gathered information about allocated devices", "numAllocated", numAllocated)
// We can estimate the size based on what we need to allocate.
alloc.allocatingDevices = make(map[DeviceID]bool, numDevicesTotal)
alloc.logger.V(6).Info("Gathered information about devices", "numAllocated", len(alloc.allocatedDevices), "toBeAllocated", numDevicesTotal)
// In practice, there aren't going to be many different CEL
// expressions. Most likely, there is going to be handful of different
@ -323,8 +301,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, nil
}
for claimIndex, allocationResult := range alloc.result {
result := make([]resourceapi.AllocationResult, len(alloc.result))
for claimIndex, internalResult := range alloc.result {
claim := alloc.claimsToAllocate[claimIndex]
allocationResult := &result[claimIndex]
allocationResult.Devices.Results = make([]resourceapi.DeviceRequestAllocationResult, len(internalResult.devices))
for i, internal := range internalResult.devices {
allocationResult.Devices.Results[i] = resourceapi.DeviceRequestAllocationResult{
Request: internal.request,
Driver: internal.id.Driver.String(),
Pool: internal.id.Pool.String(),
Device: internal.id.Device.String(),
AdminAccess: internal.adminAccess,
}
}
// Populate configs.
for requestIndex := range claim.Spec.Devices.Requests {
@ -348,14 +338,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
}
// Determine node selector.
nodeSelector, err := alloc.createNodeSelector(allocationResult)
nodeSelector, err := alloc.createNodeSelector(internalResult.devices)
if err != nil {
return nil, fmt.Errorf("create NodeSelector for claim %s: %w", claim.Name, err)
}
allocationResult.NodeSelector = nodeSelector
}
return alloc.result, nil
return result, nil
}
// errStop is a special error that gets returned by allocateOne if it detects
@ -372,8 +362,8 @@ type allocator struct {
deviceMatchesRequest map[matchKey]bool
constraints [][]constraint // one list of constraints per claim
requestData map[requestIndices]requestData // one entry per request
allocated map[DeviceID]bool
result []*resourceapi.AllocationResult
allocatingDevices map[DeviceID]bool
result []internalAllocationResult
}
// matchKey identifies a device/request pair.
@ -403,19 +393,31 @@ type requestData struct {
}
type deviceWithID struct {
DeviceID
device *resourceapi.BasicDevice
id DeviceID
basic *draapi.BasicDevice
slice *draapi.ResourceSlice
}
type internalAllocationResult struct {
devices []internalDeviceResult
}
type internalDeviceResult struct {
request string
id DeviceID
slice *draapi.ResourceSlice
adminAccess *bool
}
type constraint interface {
// add is called whenever a device is about to be allocated. It must
// check whether the device matches the constraint and if yes,
// track that it is allocated.
add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool
add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool
// For every successful add there is exactly one matching removed call
// with the exact same parameters.
remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID)
remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID)
}
// matchAttributeConstraint compares an attribute value across devices.
@ -428,13 +430,13 @@ type constraint interface {
type matchAttributeConstraint struct {
logger klog.Logger // Includes name and attribute name, so no need to repeat in log messages.
requestNames sets.Set[string]
attributeName resourceapi.FullyQualifiedName
attributeName draapi.FullyQualifiedName
attribute *resourceapi.DeviceAttribute
attribute *draapi.DeviceAttribute
numDevices int
}
func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool {
func (m *matchAttributeConstraint) add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool {
if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) {
// Device not affected by constraint.
m.logger.V(7).Info("Constraint does not apply to request", "request", requestName)
@ -491,7 +493,7 @@ func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.B
return true
}
func (m *matchAttributeConstraint) remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) {
func (m *matchAttributeConstraint) remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) {
if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) {
// Device not affected by constraint.
return
@ -501,9 +503,9 @@ func (m *matchAttributeConstraint) remove(requestName string, device *resourceap
m.logger.V(7).Info("Device removed from constraint set", "device", deviceID, "numDevices", m.numDevices)
}
func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attributeName resourceapi.FullyQualifiedName) *resourceapi.DeviceAttribute {
func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
// Fully-qualified match?
if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName)]; ok {
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok {
return &attr
}
index := strings.Index(string(attributeName), "/")
@ -512,14 +514,14 @@ func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attribu
return nil
}
if string(attributeName[0:index]) != deviceID.Driver {
if string(attributeName[0:index]) != deviceID.Driver.String() {
// Not an attribute of the driver and not found above,
// so it is not available.
return nil
}
// Domain matches the driver, so let's check just the ID.
if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName[index+1:])]; ok {
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName[index+1:])]; ok {
return &attr
}
@ -559,7 +561,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
// For "all" devices we already know which ones we need. We
// just need to check whether we can use them.
deviceWithID := requestData.allDevices[r.deviceIndex]
success, _, err := alloc.allocateDevice(r, deviceWithID.device, deviceWithID.DeviceID, true)
success, _, err := alloc.allocateDevice(r, deviceWithID, true)
if err != nil {
return false, err
}
@ -587,7 +589,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
deviceID := DeviceID{Driver: pool.Driver, Pool: pool.Pool, Device: slice.Spec.Devices[deviceIndex].Name}
// Checking for "in use" is cheap and thus gets done first.
if !ptr.Deref(request.AdminAccess, false) && alloc.allocated[deviceID] {
if !ptr.Deref(request.AdminAccess, false) && (alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) {
alloc.logger.V(7).Info("Device in use", "device", deviceID)
continue
}
@ -610,7 +612,12 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
}
// Finally treat as allocated and move on to the next device.
allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false)
device := deviceWithID{
id: deviceID,
basic: slice.Spec.Devices[deviceIndex].Basic,
slice: slice,
}
allocated, deallocate, err := alloc.allocateDevice(r, device, false)
if err != nil {
return false, err
}
@ -640,7 +647,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
}
// isSelectable checks whether a device satisfies the request and class selectors.
func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.ResourceSlice, deviceIndex int) (bool, error) {
func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSlice, deviceIndex int) (bool, error) {
// This is the only supported device type at the moment.
device := slice.Spec.Devices[deviceIndex].Basic
if device == nil {
@ -682,9 +689,9 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.Resour
}
func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
for i, selector := range selectors {
expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{})
expr := alloc.celCache.GetOrCompile(selector.CEL.Expression)
if expr.Error != nil {
// Could happen if some future apiserver accepted some
// future expression and then got downgraded. Normally
@ -697,7 +704,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas
return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver, Attributes: device.Attributes, Capacity: device.Capacity})
// If this conversion turns out to be expensive, the CEL package could be converted
// to use unique strings.
var d resourceapi.BasicDevice
if err := draapi.Convert_api_BasicDevice_To_v1alpha3_BasicDevice(device, &d, nil); err != nil {
return false, fmt.Errorf("convert BasicDevice: %w", err)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity})
if class != nil {
alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
} else {
@ -727,28 +740,28 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas
// as if that candidate had been allocated. If allocation cannot continue later
// and must try something else, then the rollback function can be invoked to
// restore the previous state.
func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) {
func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, must bool) (bool, func(), error) {
claim := alloc.claimsToAllocate[r.claimIndex]
request := &claim.Spec.Devices.Requests[r.requestIndex]
adminAccess := ptr.Deref(request.AdminAccess, false)
if !adminAccess && alloc.allocated[deviceID] {
alloc.logger.V(7).Info("Device in use", "device", deviceID)
if !adminAccess && (alloc.allocatedDevices.Has(device.id) || alloc.allocatingDevices[device.id]) {
alloc.logger.V(7).Info("Device in use", "device", device.id)
return false, nil, nil
}
// It's available. Now check constraints.
for i, constraint := range alloc.constraints[r.claimIndex] {
added := constraint.add(request.Name, device, deviceID)
added := constraint.add(request.Name, device.basic, device.id)
if !added {
if must {
// It does not make sense to declare a claim where a constraint prevents getting
// all devices. Treat this as an error.
return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, deviceID)
return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, device.id)
}
// Roll back for all previous constraints before we return.
for e := 0; e < i; e++ {
alloc.constraints[r.claimIndex][e].remove(request.Name, device, deviceID)
alloc.constraints[r.claimIndex][e].remove(request.Name, device.basic, device.id)
}
return false, nil, nil
}
@ -756,50 +769,46 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.Basi
// All constraints satisfied. Mark as in use (unless we do admin access)
// and record the result.
alloc.logger.V(7).Info("Device allocated", "device", deviceID)
alloc.logger.V(7).Info("Device allocated", "device", device.id)
if !adminAccess {
alloc.allocated[deviceID] = true
alloc.allocatingDevices[device.id] = true
}
result := resourceapi.DeviceRequestAllocationResult{
Request: request.Name,
Driver: deviceID.Driver,
Pool: deviceID.Pool,
Device: deviceID.Device,
result := internalDeviceResult{
request: request.Name,
id: device.id,
slice: device.slice,
}
if adminAccess {
result.AdminAccess = &adminAccess
result.adminAccess = &adminAccess
}
previousNumResults := len(alloc.result[r.claimIndex].Devices.Results)
alloc.result[r.claimIndex].Devices.Results = append(alloc.result[r.claimIndex].Devices.Results, result)
previousNumResults := len(alloc.result[r.claimIndex].devices)
alloc.result[r.claimIndex].devices = append(alloc.result[r.claimIndex].devices, result)
return true, func() {
for _, constraint := range alloc.constraints[r.claimIndex] {
constraint.remove(request.Name, device, deviceID)
constraint.remove(request.Name, device.basic, device.id)
}
if !adminAccess {
alloc.allocated[deviceID] = false
alloc.allocatingDevices[device.id] = false
}
// Truncate, but keep the underlying slice.
alloc.result[r.claimIndex].Devices.Results = alloc.result[r.claimIndex].Devices.Results[:previousNumResults]
alloc.logger.V(7).Info("Device deallocated", "device", deviceID)
alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults]
alloc.logger.V(7).Info("Device deallocated", "device", device.id)
}, nil
}
// createNodeSelector constructs a node selector for the allocation, if needed,
// otherwise it returns nil.
func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationResult) (*v1.NodeSelector, error) {
func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) {
// Selector with one term. That term gets extended with additional
// requirements from the different devices.
nodeSelector := &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{}},
}
for _, deviceAllocation := range allocation.Devices.Results {
slice := alloc.findSlice(deviceAllocation)
if slice == nil {
return nil, fmt.Errorf("internal error: device %+v not found in pools", deviceAllocation)
}
if slice.Spec.NodeName != "" {
for i := range result {
slice := result[i].slice
if slice.Spec.NodeName != draapi.NullUniqueString {
// At least one device is local to one node. This
// restricts the allocation to that node.
return &v1.NodeSelector{
@ -807,7 +816,7 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes
MatchFields: []v1.NodeSelectorRequirement{{
Key: "metadata.name",
Operator: v1.NodeSelectorOpIn,
Values: []string{slice.Spec.NodeName},
Values: []string{slice.Spec.NodeName.String()},
}},
}},
}, nil
@ -836,23 +845,6 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes
return nil, nil
}
func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceSlice {
for _, pool := range alloc.pools {
if pool.Driver != deviceAllocation.Driver ||
pool.Pool != deviceAllocation.Pool {
continue
}
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if device.Name == deviceAllocation.Device {
return slice
}
}
}
}
return nil
}
func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) {
for _, requirement := range from {
if !containsNodeSelectorRequirement(*to, requirement) {

View File

@ -20,6 +20,7 @@ import (
"errors"
"flag"
"fmt"
"slices"
"testing"
"github.com/onsi/gomega"
@ -35,6 +36,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
@ -182,17 +185,6 @@ func claimWithDeviceConfig(name, request, class, driver, attribute string) *reso
return claim
}
// generate allocated ResourceClaim object
func allocatedClaim(name, request, class string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceClaim {
claim := claim(name, request, class)
claim.Status.Allocation = &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: results,
},
}
return claim
}
// generate a Device object with the given name, capacity and attributes.
func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) resourceapi.Device {
return resourceapi.Device{
@ -276,13 +268,13 @@ func localNodeSelector(nodeName string) *v1.NodeSelector {
// allocationResult returns a matcher for one AllocationResult pointer with a list of
// embedded device allocation results. The order of those results doesn't matter.
func allocationResult(selector *v1.NodeSelector, results ...resourceapi.DeviceRequestAllocationResult) types.GomegaMatcher {
return gstruct.PointTo(gstruct.MatchFields(0, gstruct.Fields{
return gstruct.MatchFields(0, gstruct.Fields{
"Devices": gstruct.MatchFields(0, gstruct.Fields{
"Results": gomega.ConsistOf(results), // Order is irrelevant.
"Config": gomega.BeNil(),
}),
"NodeSelector": matchNodeSelector(selector),
}))
})
}
// matchNodeSelector returns a matcher for a node selector. The order
@ -325,8 +317,8 @@ func matchNodeSelectorRequirement(requirement v1.NodeSelectorRequirement) types.
})
}
func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.AllocationResult {
return &resourceapi.AllocationResult{
func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) resourceapi.AllocationResult {
return resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: results,
Config: []resourceapi.DeviceAllocationConfiguration{
@ -353,16 +345,16 @@ func sliceWithOneDevice(name string, nodeSelection any, pool, driver string) *re
}
func TestAllocator(t *testing.T) {
nonExistentAttribute := resourceapi.FullyQualifiedName("NonExistentAttribute")
boolAttribute := resourceapi.FullyQualifiedName("boolAttribute")
stringAttribute := resourceapi.FullyQualifiedName("stringAttribute")
versionAttribute := resourceapi.FullyQualifiedName("driverVersion")
intAttribute := resourceapi.FullyQualifiedName("numa")
nonExistentAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "NonExistentAttribute")
boolAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "boolAttribute")
stringAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "stringAttribute")
versionAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "driverVersion")
intAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "numa")
testcases := map[string]struct {
adminAccess bool
claimsToAllocate []*resourceapi.ResourceClaim
allocatedClaims []*resourceapi.ResourceClaim
allocatedDevices []DeviceID
classes []*resourceapi.DeviceClass
slices []*resourceapi.ResourceSlice
node *v1.Node
@ -943,12 +935,10 @@ func TestAllocator(t *testing.T) {
},
"already-allocated-devices": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, false),
deviceAllocationResult(req1, driverA, pool1, device2, false),
),
),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
@ -976,12 +966,10 @@ func TestAllocator(t *testing.T) {
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []*resourceapi.ResourceClaim{c}
}(),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, false),
deviceAllocationResult(req1, driverA, pool1, device2, false),
),
),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
@ -990,22 +978,6 @@ func TestAllocator(t *testing.T) {
allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, true)),
},
},
"already-allocated-for-admin-access": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, true),
deviceAllocationResult(req1, driverA, pool1, device2, true),
),
),
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
expectResults: []any{
allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, false)),
},
},
"with-constraint": {
claimsToAllocate: objects(claimWithRequests(
claim0,
@ -1397,23 +1369,15 @@ func TestAllocator(t *testing.T) {
// Listing objects is deterministic and returns them in the same
// order as in the test case. That makes the allocation result
// also deterministic.
var allocated, toAllocate claimLister
var classLister informerLister[resourceapi.DeviceClass]
var sliceLister informerLister[resourceapi.ResourceSlice]
for _, claim := range tc.claimsToAllocate {
toAllocate.claims = append(toAllocate.claims, claim.DeepCopy())
}
for _, claim := range tc.allocatedClaims {
allocated.claims = append(allocated.claims, claim.DeepCopy())
}
for _, slice := range tc.slices {
sliceLister.objs = append(sliceLister.objs, slice.DeepCopy())
}
for _, class := range tc.classes {
classLister.objs = append(classLister.objs, class.DeepCopy())
}
claimsToAllocate := slices.Clone(tc.claimsToAllocate)
allocatedDevices := slices.Clone(tc.allocatedDevices)
slices := slices.Clone(tc.slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, toAllocate.claims, allocated, classLister, sliceLister)
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1))
g.Expect(err).ToNot(gomega.HaveOccurred())
results, err := allocator.Allocate(ctx, tc.node)
@ -1425,23 +1389,14 @@ func TestAllocator(t *testing.T) {
g.Expect(results).To(gomega.ConsistOf(tc.expectResults...))
// Objects that the allocator had access to should not have been modified.
g.Expect(toAllocate.claims).To(gomega.HaveExactElements(tc.claimsToAllocate))
g.Expect(allocated.claims).To(gomega.HaveExactElements(tc.allocatedClaims))
g.Expect(sliceLister.objs).To(gomega.ConsistOf(tc.slices))
g.Expect(claimsToAllocate).To(gomega.HaveExactElements(tc.claimsToAllocate))
g.Expect(allocatedDevices).To(gomega.HaveExactElements(tc.allocatedDevices))
g.Expect(slices).To(gomega.ConsistOf(tc.slices))
g.Expect(classLister.objs).To(gomega.ConsistOf(tc.classes))
})
}
}
type claimLister struct {
claims []*resourceapi.ResourceClaim
err error
}
func (l claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
return l.claims, l.err
}
type informerLister[T any] struct {
objs []*T
err error

View File

@ -22,10 +22,9 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
draapi "k8s.io/dynamic-resource-allocation/api"
)
// GatherPools collects information about all resource pools which provide
@ -34,31 +33,35 @@ import (
// Out-dated slices are silently ignored. Pools may be incomplete (not all
// required slices available) or invalid (for example, device names not unique).
// Both is recorded in the result.
func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) {
func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node) ([]*Pool, error) {
pools := make(map[PoolID]*Pool)
// TODO (future): use a custom lister interface and implement it with
// and indexer on the node name field. Then here we can ask for only
// slices with the right node name and those with no node name.
slices, err := sliceLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("list resource slices: %w", err)
nodeName := ""
if node != nil {
nodeName = node.Name
}
for _, slice := range slices {
switch {
case slice.Spec.NodeName != "":
if slice.Spec.NodeName == node.Name {
addSlice(pools, slice)
if slice.Spec.NodeName == nodeName {
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err)
}
}
case slice.Spec.AllNodes:
addSlice(pools, slice)
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err)
}
case slice.Spec.NodeSelector != nil:
// TODO: move conversion into api.
selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector)
if err != nil {
return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err)
}
if selector.Match(node) {
addSlice(pools, slice)
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err)
}
}
default:
// Nothing known was set. This must be some future, unknown extension,
@ -84,22 +87,27 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL
return result, nil
}
func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error {
var slice draapi.ResourceSlice
if err := draapi.Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil {
return fmt.Errorf("convert ResourceSlice: %w", err)
}
id := PoolID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name}
pool := pools[id]
if pool == nil {
// New pool.
pool = &Pool{
PoolID: id,
Slices: []*resourceapi.ResourceSlice{slice},
Slices: []*draapi.ResourceSlice{&slice},
}
pools[id] = pool
return
return nil
}
if slice.Spec.Pool.Generation < pool.Slices[0].Spec.Pool.Generation {
// Out-dated.
return
return nil
}
if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation {
@ -108,11 +116,12 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
}
// Add to pool.
pool.Slices = append(pool.Slices, slice)
pool.Slices = append(pool.Slices, &slice)
return nil
}
func poolIsInvalid(pool *Pool) (bool, string) {
devices := sets.New[string]()
devices := sets.New[draapi.UniqueString]()
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if devices.Has(device.Name) {
@ -129,21 +138,29 @@ type Pool struct {
IsIncomplete bool
IsInvalid bool
InvalidReason string
Slices []*resourceapi.ResourceSlice
Slices []*draapi.ResourceSlice
}
type PoolID struct {
Driver, Pool string
Driver, Pool draapi.UniqueString
}
func (p PoolID) String() string {
return p.Driver + "/" + p.Pool
return p.Driver.String() + "/" + p.Pool.String()
}
type DeviceID struct {
Driver, Pool, Device string
Driver, Pool, Device draapi.UniqueString
}
func (d DeviceID) String() string {
return d.Driver + "/" + d.Pool + "/" + d.Device
return d.Driver.String() + "/" + d.Pool.String() + "/" + d.Device.String()
}
func MakeDeviceID(driver, pool, device string) DeviceID {
return DeviceID{
Driver: draapi.MakeUniqueString(driver),
Pool: draapi.MakeUniqueString(pool),
Device: draapi.MakeUniqueString(device),
}
}

View File

@ -30,9 +30,11 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
@ -277,7 +279,6 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil)
claimLister := claimLister{cache: claimCache}
informerFactory.Start(tCtx.Done())
defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
@ -291,10 +292,13 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
reflect.TypeOf(&v1.Node{}): true,
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
celCache := cel.NewCache(10)
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes")
slices, err := sliceLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list slices")
// Allocate one claim at a time, picking nodes randomly. Each
// allocation is stored immediately, using the claim cache to avoid
@ -306,7 +310,19 @@ claims:
continue
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister)
objs := claimCache.List(nil)
allocatedDevices := sets.New[structured.DeviceID]()
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
}
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {
@ -317,7 +333,7 @@ claims:
tCtx.ExpectNoError(err, "allocate claim")
if result != nil {
claim = claim.DeepCopy()
claim.Status.Allocation = result[0]
claim.Status.Allocation = &result[0]
claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update claim status with allocation")
tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim")
@ -327,19 +343,3 @@ claims:
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items))
}
}
type claimLister struct {
cache *assumecache.AssumeCache
}
func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
objs := c.cache.List(nil)
allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation != nil {
allocatedClaims = append(allocatedClaims, claim)
}
}
return allocatedClaims, nil
}