From 82b2a9d543fd0989c4bd5d2dc1e6345cb2ed7a23 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 07:51:14 +0100 Subject: [PATCH] DRA scheduler: fix another root cause of double device allocation GatherAllocatedState and ListAllAllocatedDevices need to collect information from different sources (allocated devices, in-flight claims), potentially even multiple times (GatherAllocatedState first gets allocated devices, then the capacities). The underlying assumption that nothing bad happens in parallel is not always true. The following log snippet shows how an update of the assume cache (feeding the allocated devices tracker) and in-flight claims lands such that GatherAllocatedState doesn't see the device in that claim as allocated: dra_manager.go:263: I0115 15:11:04.407714 18778] scheduler: Starting GatherAllocatedState ... allocateddevices.go:189: I0115 15:11:04.407945 18066] scheduler: Observed device allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-094" claim="testdra-all-usesallresources-hvs5d/claim-0553" dynamicresources.go:1150: I0115 15:11:04.407981 89109] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" dra_manager.go:201: I0115 15:11:04.408008 89109] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 version="1211" dynamicresources.go:1157: I0115 15:11:04.408044 89109] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" allocation=< { "devices": { "results": [ { "request": "req-1", "driver": "testdra-all-usesallresources-hvs5d.driver", "pool": "worker-5", "device": "worker-5-device-094" } ] }, "nodeSelector": { "nodeSelectorTerms": [ { "matchFields": [ { "key": "metadata.name", "operator": "In", "values": [ "worker-5" ] } ] } ] }, "allocationTimestamp": "2026-01-15T14:11:04Z" } > dra_manager.go:280: I0115 15:11:04.408085 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-095" claim="testdra-all-usesallresources-hvs5d/claim-0086" dra_manager.go:280: I0115 15:11:04.408137 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-096" claim="testdra-all-usesallresources-hvs5d/claim-0165" default_binder.go:69: I0115 15:11:04.408175 89109] scheduler: Attempting to bind pod to node pod="testdra-all-usesallresources-hvs5d/my-pod-0553" node="worker-5" dra_manager.go:265: I0115 15:11:04.408264 18778] scheduler: Finished GatherAllocatedState allocatedDevices=: { Initial state: "worker-5-device-094" is in-flight, not in cache - goroutine #1: starts GatherAllocatedState, copies cache - goroutine #2: adds to assume cache, removes from in-flight - goroutine #1: checks in-flight => device never seen as allocated This is the second reason for double allocation of the same device in two different claims. The other was timing in the assume cache. Both were tracked down with an integration test (separate commit). It did not fail all the time, but enough that regressions should show up as flakes. --- .../dynamicresources/allocateddevices.go | 33 +++++++++-- .../plugins/dynamicresources/dra_manager.go | 56 +++++++++++++++---- .../dynamicresources/dynamicresources.go | 54 ++++++++++++------ 3 files changed, 111 insertions(+), 32 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index 24d6f8c97c3..21d192ffc89 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -85,11 +85,18 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, // This is cheaper than repeatedly calling List, making strings unique, and building the set // each time PreFilter is called. // +// To simplify detecting concurrent changes, each modification bumps a revision counter, +// similar to ResourceVersion in the apiserver. Get and Capacities include the +// current value in their result. A caller than can compare againt the current value +// to determine whether some prior results are still up-to-date, without having to get +// and compare them. +// // All methods are thread-safe. Get returns a cloned set. type allocatedDevices struct { logger klog.Logger mutex sync.RWMutex + revision int64 ids sets.Set[structured.DeviceID] shareIDs sets.Set[structured.SharedDeviceID] capacities structured.ConsumedCapacityCollection @@ -106,18 +113,25 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices { } } -func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { +func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.ids.Clone() + return a.ids.Clone(), a.revision } -func (a *allocatedDevices) Capacities() structured.ConsumedCapacityCollection { +func (a *allocatedDevices) Capacities() (structured.ConsumedCapacityCollection, int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.capacities.Clone() + return a.capacities.Clone(), a.revision +} + +func (a *allocatedDevices) Revision() int64 { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.revision } func (a *allocatedDevices) handlers() cache.ResourceEventHandler { @@ -200,8 +214,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { }, ) + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Insert(deviceID) } @@ -241,8 +260,14 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { a.logger.V(6).Info("Observed consumed capacity release", "device id", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim)) deviceCapacities = append(deviceCapacities, capacity) }) + + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Delete(deviceID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 42b54c08e1f..2be14d10ec6 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -18,6 +18,7 @@ package dynamicresources import ( "context" + "errors" "fmt" "sync" @@ -218,10 +219,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) { return result, nil } +// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices +// or GatherAllocatedState need to be retried. +// +// There is a rare race when a claim is initially in-flight: +// - allocated is created from cache (claim not there) +// - someone removes from the in-flight claims and adds to the cache +// - we start checking in-flight claims (claim not there anymore) +// => claim ignored +// +// A proper fix would be to rewrite the assume cache, allocatedDevices, +// and the in-flight map so that they are under a single lock. But that's +// a pretty big change and prevents reusing the assume cache. So instead +// we check for changes in the set of allocated devices and keep trying +// until we get an attempt with no concurrent changes. +// +// A claim being first in the cache, then only in-flight cannot happen, +// so we don't need to re-check the in-flight claims. +var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification") + func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. c.inFlightAllocations.Range(func(key, value any) bool { @@ -232,16 +252,26 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], }, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {}) return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return allocated, nil + + if revision == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return allocated, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision1 := c.allocatedDevices.Get() allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]() - aggregatedCapacity := c.allocatedDevices.Capacities() + aggregatedCapacity, revision2 := c.allocatedDevices.Capacities() + + if revision1 != revision2 { + // Already not consistent. Try again. + return nil, errClaimTrackerConcurrentModification + } enabledConsumableCapacity := utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity) @@ -263,12 +293,16 @@ func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return &structured.AllocatedState{ - AllocatedDevices: allocated, - AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, - AggregatedCapacity: aggregatedCapacity, - }, nil + if revision1 == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return &structured.AllocatedState{ + AllocatedDevices: allocated, + AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, + AggregatedCapacity: aggregatedCapacity, + }, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index b92a3e7b057..75551dc9805 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -518,25 +518,45 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. + // + // This might have to be retried in the unlikely case that some concurrent modification made + // the result invalid. var allocatedState *structured.AllocatedState - if pl.fts.EnableDRAConsumableCapacity { - allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() - if err != nil { - return nil, statusError(logger, err) - } - if allocatedState == nil { - return nil, statusError(logger, errors.New("nil allocated state")) - } - } else { - allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() - if err != nil { - return nil, statusError(logger, err) - } - allocatedState = &structured.AllocatedState{ - AllocatedDevices: allocatedDevices, - AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), - AggregatedCapacity: structured.NewConsumedCapacityCollection(), + err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) { + if pl.fts.EnableDRAConsumableCapacity { + allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during GatherAllocatedState, trying again") + return false, nil + } + return false, err + } + if allocatedState == nil { + return false, errors.New("nil allocated state") + } + // Done. + return true, nil + } else { + allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again") + return false, nil + } + return false, err + } + allocatedState = &structured.AllocatedState{ + AllocatedDevices: allocatedDevices, + AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), + AggregatedCapacity: structured.NewConsumedCapacityCollection(), + } + // Done. + return true, nil } + }) + if err != nil { + return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err)) } slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules() if err != nil {