From 75fd186e7a95325d30a27ad99464cfff12aca349 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 08:39:02 +0100 Subject: [PATCH 1/2] DRA scheduler: fix one root cause of double device allocation DRA depends on the assume cache having invoked all event handlers before Assume() returns, because DRA maintains state that is relevant for scheduling through those event handlers. This log snippet shows how this went wrong during PreBind: dynamicresources.go:1150: I0115 10:35:29.264437] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" dra_manager.go:198: I0115 10:35:29.264448] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 version="287" dynamicresources.go:1157: I0115 10:35:29.264463] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" allocation=< ... allocateddevices.go:189: I0115 10:35:29.267315] scheduler: Observed device allocation device="testdra-all-usesallresources-kqjpj.driver/worker-1/worker-1-device-096" claim="testdra-all-usesallresources-kqjpj/claim-0091" - goroutine #1: UpdateStatus result delivered via informer. AssumeCache updates cache, pushes event A, emitEvents pulls event A from queue. *Not* done with delivering it yet! - goroutine #2: AssumeCache.Assume called. Updates cache, pushes event B, emits it. Old and new claim have allocation, so no "Observed device allocation". - goroutine #3: Schedules next pod, without considering device as allocated (not in the log snippet). - goroutine #1: Finally delivers event A: "Observed device allocation", but too late. Also, events are delivered out-of-order. The fix is to let emitEvents when called by Assume wait for a potentially running emitEvents in some other goroutine, thus ensuring that an event pulled out of the queue by that other goroutine got delivered before Assume itself checks the queue one more time and then returns. The time window were things go wrong is small. An E2E test covering this only flaked rarely, and only in the CI. An integration test (separate commit) with higher number of pods finally made it possible to reproduce locally. It also uncovered a second race (fix in separate commit). The unit test fails without the fix: === RUN TestAssumeConcurrency assume_cache_test.go:311: FATAL ERROR: Assume should have blocked and didn't. --- FAIL: TestAssumeConcurrency (0.00s) --- .../util/assumecache/assume_cache.go | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index c7392129cbd..034ca18c867 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -124,6 +124,9 @@ type AssumeCache struct { // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // cond is used by emitEvents. + cond *sync.Cond + // All registered event handlers. eventHandlers []cache.ResourceEventHandler handlerRegistration cache.ResourceEventHandlerRegistration @@ -149,6 +152,9 @@ type AssumeCache struct { // of events would no longer be guaranteed. eventQueue queue.FIFO[func()] + // emittingEvents is true while one emitEvents call is actively emitting events. + emittingEvents bool + // describes the object stored description string @@ -195,6 +201,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam indexFunc: indexFunc, indexName: indexName, } + c.cond = sync.NewCond(&c.rwMutex) indexers := cache.Indexers{} if indexName != "" && indexFunc != nil { indexers[indexName] = c.objInfoIndexFunc @@ -507,8 +514,31 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache. } // emitEvents delivers all pending events that are in the queue, in the order -// in which they were stored there (FIFO). +// in which they were stored there (FIFO). Only one goroutine at a time is +// delivering events, to ensure correct order. func (c *AssumeCache) emitEvents() { + c.rwMutex.Lock() + for c.emittingEvents { + // Wait for the active caller of emitEvents to finish. + // When it is done, it may or may not have drained + // the events pushed by our caller. + // We'll check below ourselves. + c.cond.Wait() + } + c.emittingEvents = true + c.rwMutex.Unlock() + + defer func() { + c.rwMutex.Lock() + c.emittingEvents = false + // Hand over the batton to one other goroutine, if there is one. + // We don't need to wake up more than one because only one of + // them would be able to grab the "emittingEvents" responsibility. + c.cond.Signal() + c.rwMutex.Unlock() + }() + + // When we get here, this instance of emitEvents is the active one. for { c.rwMutex.Lock() deliver, ok := c.eventQueue.Pop() From ba81d3040dda7f2db0e42e49820e46f919a75050 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 07:51:14 +0100 Subject: [PATCH 2/2] DRA scheduler: fix another root cause of double device allocation GatherAllocatedState and ListAllAllocatedDevices need to collect information from different sources (allocated devices, in-flight claims), potentially even multiple times (GatherAllocatedState first gets allocated devices, then the capacities). The underlying assumption that nothing bad happens in parallel is not always true. The following log snippet shows how an update of the assume cache (feeding the allocated devices tracker) and in-flight claims lands such that GatherAllocatedState doesn't see the device in that claim as allocated: dra_manager.go:263: I0115 15:11:04.407714 18778] scheduler: Starting GatherAllocatedState ... allocateddevices.go:189: I0115 15:11:04.407945 18066] scheduler: Observed device allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-094" claim="testdra-all-usesallresources-hvs5d/claim-0553" dynamicresources.go:1150: I0115 15:11:04.407981 89109] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" dra_manager.go:201: I0115 15:11:04.408008 89109] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 version="1211" dynamicresources.go:1157: I0115 15:11:04.408044 89109] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" allocation=< { "devices": { "results": [ { "request": "req-1", "driver": "testdra-all-usesallresources-hvs5d.driver", "pool": "worker-5", "device": "worker-5-device-094" } ] }, "nodeSelector": { "nodeSelectorTerms": [ { "matchFields": [ { "key": "metadata.name", "operator": "In", "values": [ "worker-5" ] } ] } ] }, "allocationTimestamp": "2026-01-15T14:11:04Z" } > dra_manager.go:280: I0115 15:11:04.408085 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-095" claim="testdra-all-usesallresources-hvs5d/claim-0086" dra_manager.go:280: I0115 15:11:04.408137 18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-096" claim="testdra-all-usesallresources-hvs5d/claim-0165" default_binder.go:69: I0115 15:11:04.408175 89109] scheduler: Attempting to bind pod to node pod="testdra-all-usesallresources-hvs5d/my-pod-0553" node="worker-5" dra_manager.go:265: I0115 15:11:04.408264 18778] scheduler: Finished GatherAllocatedState allocatedDevices=: { Initial state: "worker-5-device-094" is in-flight, not in cache - goroutine #1: starts GatherAllocatedState, copies cache - goroutine #2: adds to assume cache, removes from in-flight - goroutine #1: checks in-flight => device never seen as allocated This is the second reason for double allocation of the same device in two different claims. The other was timing in the assume cache. Both were tracked down with an integration test (separate commit). It did not fail all the time, but enough that regressions should show up as flakes. --- .../dynamicresources/allocateddevices.go | 28 ++++++++++++++--- .../plugins/dynamicresources/dra_manager.go | 31 +++++++++++++++++-- .../dynamicresources/dynamicresources.go | 22 +++++++++++-- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index 9c8d2305b20..e26b72398a0 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -62,12 +62,19 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s // This is cheaper than repeatedly calling List, making strings unique, and building the set // each time PreFilter is called. // +// To simplify detecting concurrent changes, each modification bumps a revision counter, +// similar to ResourceVersion in the apiserver. Get and Capacities include the +// current value in their result. A caller than can compare againt the current value +// to determine whether some prior results are still up-to-date, without having to get +// and compare them. +// // All methods are thread-safe. Get returns a cloned set. type allocatedDevices struct { logger klog.Logger - mutex sync.RWMutex - ids sets.Set[structured.DeviceID] + mutex sync.RWMutex + revision int64 + ids sets.Set[structured.DeviceID] } func newAllocatedDevices(logger klog.Logger) *allocatedDevices { @@ -77,11 +84,18 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices { } } -func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { +func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.ids.Clone() + return a.ids.Clone(), a.revision +} + +func (a *allocatedDevices) Revision() int64 { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.revision } func (a *allocatedDevices) handlers() cache.ResourceEventHandler { @@ -147,8 +161,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { deviceIDs = append(deviceIDs, deviceID) }) + if len(deviceIDs) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Insert(deviceID) } @@ -169,6 +188,7 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Delete(deviceID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 47d3d5de4ce..3d7f4c91559 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -18,6 +18,7 @@ package dynamicresources import ( "context" + "errors" "fmt" "sync" @@ -201,10 +202,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) { return result, nil } +// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices +// or GatherAllocatedState need to be retried. +// +// There is a rare race when a claim is initially in-flight: +// - allocated is created from cache (claim not there) +// - someone removes from the in-flight claims and adds to the cache +// - we start checking in-flight claims (claim not there anymore) +// => claim ignored +// +// A proper fix would be to rewrite the assume cache, allocatedDevices, +// and the in-flight map so that they are under a single lock. But that's +// a pretty big change and prevents reusing the assume cache. So instead +// we check for changes in the set of allocated devices and keep trying +// until we get an attempt with no concurrent changes. +// +// A claim being first in the cache, then only in-flight cannot happen, +// so we don't need to re-check the in-flight claims. +var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification") + func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. c.inFlightAllocations.Range(func(key, value any) bool { @@ -215,8 +235,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], }) return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return allocated, nil + + if revision == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return allocated, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index dd1adfc5aa7..35688e19ad4 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -23,6 +23,7 @@ import ( "slices" "strings" "sync" + "time" "github.com/google/go-cmp/cmp" //nolint:depguard @@ -34,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" @@ -448,9 +450,25 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() + // + // This might have to be retried in the unlikely case that some concurrent modification made + // the result invalid. + var allAllocatedDevices sets.Set[structured.DeviceID] + err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) { + ad, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again") + return false, nil + } + return false, err + } + // Done. + allAllocatedDevices = ad + return true, nil + }) if err != nil { - return nil, statusError(logger, err) + return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err)) } slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules() if err != nil {