From f296d4a9a46fcdf15ded56e1646d92abe9afde8e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Jan 2026 08:39:02 +0100 Subject: [PATCH] DRA scheduler: fix one root cause of double device allocation DRA depends on the assume cache having invoked all event handlers before Assume() returns, because DRA maintains state that is relevant for scheduling through those event handlers. This log snippet shows how this went wrong during PreBind: dynamicresources.go:1150: I0115 10:35:29.264437] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" dra_manager.go:198: I0115 10:35:29.264448] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 version="287" dynamicresources.go:1157: I0115 10:35:29.264463] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" allocation=< ... allocateddevices.go:189: I0115 10:35:29.267315] scheduler: Observed device allocation device="testdra-all-usesallresources-kqjpj.driver/worker-1/worker-1-device-096" claim="testdra-all-usesallresources-kqjpj/claim-0091" - goroutine #1: UpdateStatus result delivered via informer. AssumeCache updates cache, pushes event A, emitEvents pulls event A from queue. *Not* done with delivering it yet! - goroutine #2: AssumeCache.Assume called. Updates cache, pushes event B, emits it. Old and new claim have allocation, so no "Observed device allocation". - goroutine #3: Schedules next pod, without considering device as allocated (not in the log snippet). - goroutine #1: Finally delivers event A: "Observed device allocation", but too late. Also, events are delivered out-of-order. The fix is to let emitEvents when called by Assume wait for a potentially running emitEvents in some other goroutine, thus ensuring that an event pulled out of the queue by that other goroutine got delivered before Assume itself checks the queue one more time and then returns. The time window were things go wrong is small. An E2E test covering this only flaked rarely, and only in the CI. An integration test (separate commit) with higher number of pods finally made it possible to reproduce locally. It also uncovered a second race (fix in separate commit). The unit test fails without the fix: === RUN TestAssumeConcurrency assume_cache_test.go:311: FATAL ERROR: Assume should have blocked and didn't. --- FAIL: TestAssumeConcurrency (0.00s) --- .../util/assumecache/assume_cache.go | 32 +++++++++- .../util/assumecache/assume_cache_test.go | 59 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index 889cb9efe3b..fae3230bc3a 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -124,6 +124,9 @@ type AssumeCache struct { // Synchronizes updates to all fields below. rwMutex sync.RWMutex + // cond is used by emitEvents. + cond *sync.Cond + // All registered event handlers. eventHandlers []cache.ResourceEventHandler handlerRegistration cache.ResourceEventHandlerRegistration @@ -149,6 +152,9 @@ type AssumeCache struct { // of events would no longer be guaranteed. eventQueue buffer.Ring[func()] + // emittingEvents is true while one emitEvents call is actively emitting events. + emittingEvents bool + // describes the object stored description string @@ -196,6 +202,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam indexName: indexName, eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}), } + c.cond = sync.NewCond(&c.rwMutex) indexers := cache.Indexers{} if indexName != "" && indexFunc != nil { indexers[indexName] = c.objInfoIndexFunc @@ -508,8 +515,31 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache. } // emitEvents delivers all pending events that are in the queue, in the order -// in which they were stored there (FIFO). +// in which they were stored there (FIFO). Only one goroutine at a time is +// delivering events, to ensure correct order. func (c *AssumeCache) emitEvents() { + c.rwMutex.Lock() + for c.emittingEvents { + // Wait for the active caller of emitEvents to finish. + // When it is done, it may or may not have drained + // the events pushed by our caller. + // We'll check below ourselves. + c.cond.Wait() + } + c.emittingEvents = true + c.rwMutex.Unlock() + + defer func() { + c.rwMutex.Lock() + c.emittingEvents = false + // Hand over the batton to one other goroutine, if there is one. + // We don't need to wake up more than one because only one of + // them would be able to grab the "emittingEvents" responsibility. + c.cond.Signal() + c.rwMutex.Unlock() + }() + + // When we get here, this instance of emitEvents is the active one. for { c.rwMutex.Lock() deliver, ok := c.eventQueue.ReadOne() diff --git a/pkg/scheduler/util/assumecache/assume_cache_test.go b/pkg/scheduler/util/assumecache/assume_cache_test.go index a18d51e0a87..8024b20987d 100644 --- a/pkg/scheduler/util/assumecache/assume_cache_test.go +++ b/pkg/scheduler/util/assumecache/assume_cache_test.go @@ -265,6 +265,65 @@ func TestAssume(t *testing.T) { } } +// TestAssumeRace simulates this sequence of events: +// - Informer update arrives, event handler gets invoked and is slow. +// - Assume for the same object is called. It must block until +// the informer-triggered event is delivered. +func TestAssumeRace(t *testing.T) { ktesting.Init(t).SyncTest("", testAssumeRace) } +func testAssumeRace(tCtx ktesting.TContext) { + var informer testInformer + testObj := makeObj("pvc1", "1", "") + ac := NewAssumeCache(tCtx.Logger(), &informer, "TestObject", "", nil) + blockEvent := ktesting.WithCancel(tCtx) + defer blockEvent.Cancel("test done") + eventBlocked := false + eventDone := false + ac.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + eventBlocked = true + <-blockEvent.Done() + eventDone = true + }, + }) + + // Here a real client with to a Create. What the assume cache may or may not + // see before Assume is the new object from the informer - let's pretend that + // comes first. + go informer.add(testObj) + + // Wait for processing to finish. + tCtx.Wait() + if !eventBlocked { + tCtx.Fatal("Event handler should have been called and wasn't.") + } + + // Assume should block until we unblock the event delivery. + assumeDone := false + go func() { + err := ac.Assume(testObj) + if err != nil { + tCtx.Errorf("Assume failed: %v", err) + } + assumeDone = true + }() + + // Wait for Assume to be blocked in its implementation. + tCtx.Wait() + if assumeDone { + tCtx.Fatal("Assume should have blocked and didn't.") + } + + // Unblock both goroutines. + blockEvent.Cancel("proceed") + tCtx.Wait() + if !eventDone { + tCtx.Fatal("Event should have been delivered and wasn't.") + } + if !assumeDone { + tCtx.Fatal("Assume should have returned and didn't.") + } +} + func TestRestore(t *testing.T) { tCtx, cache, informer := newTest(t) var events mockEventHandler