DRA scheduler: refactor "allocated devices" lookup

The logic for skipping "admin access" was repeated in three different places. A
single foreachAllocatedDevices with a callback puts it into one function.
This commit is contained in:
Patrick Ohly 2024-10-28 19:00:20 +01:00
parent bd7ff9c4c7
commit 0130ebba1d
2 changed files with 36 additions and 30 deletions

View File

@ -28,6 +28,36 @@ import (
"k8s.io/utils/ptr"
)
// foreachAllocatedDevice invokes the provided callback for each
// device in the claim's allocation result which was allocated
// exclusively for the claim.
//
// Devices allocated with admin access can be shared with other
// claims and are skipped without invoking the callback.
//
// foreachAllocatedDevice does nothing if the claim is not allocated.
func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID structured.DeviceID)) {
if claim.Status.Allocation == nil {
return
}
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
// None of the users of this helper need to abort iterating,
// therefore it's not supported as it only would add overhead.
cb(deviceID)
}
}
// allocatedDevices reacts to events in a cache and maintains a set of all allocated devices.
// This is cheaper than repeatedly calling List, making strings unique, and building the set
// each time PreFilter is called.
@ -112,16 +142,10 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
for _, result := range claim.Status.Allocation.Devices.Results {
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device allocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
}
})
a.mutex.Lock()
defer a.mutex.Unlock()
@ -138,17 +162,10 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
// Locking of the mutex gets minimized by pre-computing what needs to be done
// without holding the lock.
deviceIDs := make([]structured.DeviceID, 0, 20)
for _, result := range claim.Status.Allocation.Devices.Results {
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated and thus does not need to be removed
// because of this claim.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
a.logger.V(6).Info("Observed device deallocation", "device", deviceID, "claim", klog.KObj(claim))
deviceIDs = append(deviceIDs, deviceID)
}
})
a.mutex.Lock()
defer a.mutex.Unlock()

View File

@ -46,7 +46,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/ptr"
)
const (
@ -571,20 +570,10 @@ func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set
// Whatever is in flight also has to be checked.
pl.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
}
})
return true
})
return allocated