diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go new file mode 100644 index 00000000000..b1629c0db6c --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -0,0 +1,158 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "sync" + + resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" + "k8s.io/utils/ptr" +) + +// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices. +// This is cheaper than repeatedly calling List, making strings unique, and building the set +// each time PreFilter is called. +// +// All methods are thread-safe. Get returns a cloned set. +type allocatedDevices struct { + logger klog.Logger + + mutex sync.RWMutex + ids sets.Set[structured.DeviceID] +} + +func newAllocatedDevices(logger klog.Logger) *allocatedDevices { + return &allocatedDevices{ + logger: logger, + ids: sets.New[structured.DeviceID](), + } +} + +func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.ids.Clone() +} + +func (a *allocatedDevices) handlers() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: a.onAdd, + UpdateFunc: a.onUpdate, + DeleteFunc: a.onDelete, + } +} + +func (a *allocatedDevices) onAdd(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onAdd") + return + } + + if claim.Status.Allocation != nil { + a.addDevices(claim) + } +} + +func (a *allocatedDevices) onUpdate(oldObj, newObj any) { + originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate") + return + } + + switch { + case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil: + a.addDevices(modifiedClaim) + case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil: + a.removeDevices(originalClaim) + default: + // Nothing to do. Either both nil or both non-nil, in which case the content + // also must be the same (immutable!). + } +} + +func (a *allocatedDevices) onDelete(obj any) { + claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil) + if err != nil { + // Shouldn't happen. + a.logger.Error(err, "unexpected object in allocatedDevices.onDelete") + return + } + + a.removeDevices(claim) +} + +func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + + for _, result := range claim.Status.Allocation.Devices.Results { + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + a.logger.V(6).Info("Device was allocated", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + } + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Insert(deviceID) + } +} + +func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { + if claim.Status.Allocation == nil { + return + } + + // Locking of the mutex gets minimized by pre-computing what needs to be done + // without holding the lock. + deviceIDs := make([]structured.DeviceID, 0, 20) + + for _, result := range claim.Status.Allocation.Devices.Results { + if ptr.Deref(result.AdminAccess, false) { + // Is not considered as allocated and thus does not need to be removed + // because of this claim. + continue + } + deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) + a.logger.V(6).Info("Device was deallocated", "device", deviceID, "claim", klog.KObj(claim)) + deviceIDs = append(deviceIDs, deviceID) + } + + a.mutex.Lock() + defer a.mutex.Unlock() + for _, deviceID := range deviceIDs { + a.ids.Delete(deviceID) + } +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index bac18e416d4..98a6ad75200 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -108,11 +108,12 @@ type DynamicResources struct { enableAdminAccess bool enableSchedulingQueueHint bool - fh framework.Handle - clientset kubernetes.Interface - classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister - celCache *structured.CELCache + fh framework.Handle + clientset kubernetes.Interface + classLister resourcelisters.DeviceClassLister + sliceLister resourcelisters.ResourceSliceLister + celCache *structured.CELCache + allocatedDevices *allocatedDevices // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object @@ -177,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &DynamicResources{}, nil } + logger := klog.FromContext(ctx) pl := &DynamicResources{ enabled: true, enableAdminAccess: fts.EnableDRAAdminAccess, @@ -192,8 +194,14 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe // recent 10 of them get reused across different scheduling // cycles. celCache: structured.NewCELCache(10), + + allocatedDevices: newAllocatedDevices(logger), } + // Reacting to events is more efficient than iterating over the list + // repeatedly in PreFilter. + pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers()) + return pl, nil } @@ -538,10 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - allocatedDevices := pl.listAllAllocatedDevices() - if err != nil { - return nil, statusError(logger, err) - } + allocatedDevices := pl.listAllAllocatedDevices(logger) slices, err := pl.sliceLister.List(labels.Everything()) if err != nil { return nil, statusError(logger, err) @@ -558,18 +563,14 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, nil } -func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID { - // Probably not worth adding an index for? - objs := pl.claimAssumeCache.List(nil) - var allocated []structured.DeviceID - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) - if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok { - claim = obj.(*resourceapi.ResourceClaim) - } - if claim.Status.Allocation == nil { - continue - } +func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] { + // Start with a fresh set that matches the current known state of the + // world according to the informers. + allocated := pl.allocatedDevices.Get() + + // Whatever is in flight also has to be checked. + pl.inFlightAllocations.Range(func(key, value any) bool { + claim := value.(*resourceapi.ResourceClaim) for _, result := range claim.Status.Allocation.Devices.Results { // Kubernetes 1.31 did not set this, 1.32 always does. // Supporting 1.31 is not worth the additional code that @@ -581,9 +582,11 @@ func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID { continue } deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device) - allocated = append(allocated, deviceID) + logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) + allocated.Insert(deviceID) } - } + return true + }) return allocated } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index 51f950fb132..b79e7950e5a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -58,7 +58,7 @@ type Allocator struct { func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, - allocatedDevices []DeviceID, + allocatedDevices sets.Set[DeviceID], classLister resourcelisters.DeviceClassLister, slices []*resourceapi.ResourceSlice, celCache *CELCache, @@ -66,12 +66,11 @@ func NewAllocator(ctx context.Context, return &Allocator{ adminAccessEnabled: adminAccessEnabled, claimsToAllocate: claimsToAllocate, - // This won't change, so build this set only once. - allocatedDevices: sets.New(allocatedDevices...), - classLister: classLister, - slices: slices, - celCache: celCache, - celMutex: keymutex.NewHashed(0), + allocatedDevices: allocatedDevices, + classLister: classLister, + slices: slices, + celCache: celCache, + celMutex: keymutex.NewHashed(0), }, nil } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index c9d87325bdb..e647660a3fe 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) @@ -1375,7 +1376,7 @@ func TestAllocator(t *testing.T) { allocatedDevices := slices.Clone(tc.allocatedDevices) slices := slices.Clone(tc.slices) - allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1)) + allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1)) g.Expect(err).ToNot(gomega.HaveOccurred()) results, err := allocator.Allocate(ctx, tc.node) diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index ce161911eab..143f269da9e 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" @@ -309,14 +310,14 @@ claims: } objs := claimCache.List(nil) - allocatedDevices := make([]structured.DeviceID, 0, len(objs)) + allocatedDevices := sets.New[structured.DeviceID]() for _, obj := range objs { claim := obj.(*resourceapi.ResourceClaim) if claim.Status.Allocation == nil { continue } for _, result := range claim.Status.Allocation.Devices.Results { - allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) + allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device)) } }