DRA scheduler: maintain a set of allocated device IDs

Reacting to events from the informer cache (indirectly, through the assume
cache) is more efficient than repeatedly listing it's content and then
converting to IDs with unique strings.

    goos: linux
    goarch: amd64
    pkg: k8s.io/kubernetes/test/integration/scheduler_perf
    cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz
                                                                                       │            before            │                        after                        │
                                                                                       │ SchedulingThroughput/Average │ SchedulingThroughput/Average  vs base               │
    PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36                      54.70 ± 6%                     76.81 ± 6%  +40.42% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36                     106.4 ± 4%                     105.6 ± 2%        ~ (p=0.413 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36                     120.0 ± 4%                     118.9 ± 7%        ~ (p=0.117 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36                      112.5 ± 4%                     105.9 ± 4%   -5.87% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36                      87.13 ± 4%                    123.55 ± 4%  +41.80% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36                      113.4 ± 2%                     103.3 ± 2%   -8.95% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36                      65.55 ± 3%                    121.30 ± 3%  +85.05% (p=0.002 n=6)
    geomean                                                                                                90.81                          106.8       +17.57%
This commit is contained in:
Patrick Ohly 2024-09-05 18:44:21 +02:00
parent f0efb8a5fd
commit bc55e82621
5 changed files with 195 additions and 33 deletions

View File

@ -0,0 +1,158 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"sync"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/ptr"
)
// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices.
// This is cheaper than repeatedly calling List, making strings unique, and building the set
// each time PreFilter is called.
//
// All methods are thread-safe. Get returns a cloned set.
type allocatedDevices struct {
logger klog.Logger
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
}
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
return &allocatedDevices{
logger: logger,
ids: sets.New[structured.DeviceID](),
}
}
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.ids.Clone()
}
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: a.onAdd,
UpdateFunc: a.onUpdate,
DeleteFunc: a.onDelete,
}
}
func (a *allocatedDevices) onAdd(obj any) {
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onAdd")
return
}
if claim.Status.Allocation != nil {
a.addDevices(claim)
}
}
func (a *allocatedDevices) onUpdate(oldObj, newObj any) {
originalClaim, modifiedClaim, err := schedutil.As[*resourceapi.ResourceClaim](oldObj, newObj)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onUpdate")
return
}
switch {
case originalClaim.Status.Allocation == nil && modifiedClaim.Status.Allocation != nil:
a.addDevices(modifiedClaim)
case originalClaim.Status.Allocation != nil && modifiedClaim.Status.Allocation == nil:
a.removeDevices(originalClaim)
default:
// Nothing to do. Either both nil or both non-nil, in which case the content
// also must be the same (immutable!).
}
}
func (a *allocatedDevices) onDelete(obj any) {
claim, _, err := schedutil.As[*resourceapi.ResourceClaim](obj, nil)
if err != nil {
// Shouldn't happen.
a.logger.Error(err, "unexpected object in allocatedDevices.onDelete")
return
}
a.removeDevices(claim)
}
func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
if claim.Status.Allocation == nil {
return
}
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
for _, result := range claim.Status.Allocation.Devices.Results {
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
a.logger.V(6).Info("Device was allocated", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
}
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Insert(deviceID)
}
}
func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
if claim.Status.Allocation == nil {
return
}
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
for _, result := range claim.Status.Allocation.Devices.Results {
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated and thus does not need to be removed
// because of this claim.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
a.logger.V(6).Info("Device was deallocated", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
}
a.mutex.Lock()
defer a.mutex.Unlock()
for _, deviceID := range deviceIDs {
a.ids.Delete(deviceID)
}
}

View File

@ -108,11 +108,12 @@ type DynamicResources struct {
enableAdminAccess bool
enableSchedulingQueueHint bool
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
celCache *structured.CELCache
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
celCache *structured.CELCache
allocatedDevices *allocatedDevices
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
@ -177,6 +178,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &DynamicResources{}, nil
}
logger := klog.FromContext(ctx)
pl := &DynamicResources{
enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess,
@ -192,8 +194,14 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
// recent 10 of them get reused across different scheduling
// cycles.
celCache: structured.NewCELCache(10),
allocatedDevices: newAllocatedDevices(logger),
}
// Reacting to events is more efficient than iterating over the list
// repeatedly in PreFilter.
pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers())
return pl, nil
}
@ -538,10 +546,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Claims are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allocatedDevices := pl.listAllAllocatedDevices()
if err != nil {
return nil, statusError(logger, err)
}
allocatedDevices := pl.listAllAllocatedDevices(logger)
slices, err := pl.sliceLister.List(labels.Everything())
if err != nil {
return nil, statusError(logger, err)
@ -558,18 +563,14 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, nil
}
func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID {
// Probably not worth adding an index for?
objs := pl.claimAssumeCache.List(nil)
var allocated []structured.DeviceID
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok {
claim = obj.(*resourceapi.ResourceClaim)
}
if claim.Status.Allocation == nil {
continue
}
func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := pl.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
pl.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
@ -581,9 +582,11 @@ func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID {
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
allocated = append(allocated, deviceID)
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
}
}
return true
})
return allocated
}

View File

@ -58,7 +58,7 @@ type Allocator struct {
func NewAllocator(ctx context.Context,
adminAccessEnabled bool,
claimsToAllocate []*resourceapi.ResourceClaim,
allocatedDevices []DeviceID,
allocatedDevices sets.Set[DeviceID],
classLister resourcelisters.DeviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *CELCache,
@ -66,12 +66,11 @@ func NewAllocator(ctx context.Context,
return &Allocator{
adminAccessEnabled: adminAccessEnabled,
claimsToAllocate: claimsToAllocate,
// This won't change, so build this set only once.
allocatedDevices: sets.New(allocatedDevices...),
classLister: classLister,
slices: slices,
celCache: celCache,
celMutex: keymutex.NewHashed(0),
allocatedDevices: allocatedDevices,
classLister: classLister,
slices: slices,
celCache: celCache,
celMutex: keymutex.NewHashed(0),
}, nil
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
@ -1375,7 +1376,7 @@ func TestAllocator(t *testing.T) {
allocatedDevices := slices.Clone(tc.allocatedDevices)
slices := slices.Clone(tc.slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices, NewCELCache(1))
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1))
g.Expect(err).ToNot(gomega.HaveOccurred())
results, err := allocator.Allocate(ctx, tc.node)

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"
@ -309,14 +310,14 @@ claims:
}
objs := claimCache.List(nil)
allocatedDevices := make([]structured.DeviceID, 0, len(objs))
allocatedDevices := sets.New[structured.DeviceID]()
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
allocatedDevices.Insert(structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
}
}