DRA scheduler: fix one root cause of double device allocation

DRA depends on the assume cache having invoked all event handlers before
Assume() returns, because DRA maintains state that is relevant for scheduling
through those event handlers.

This log snippet shows how this went wrong during PreBind:

    dynamicresources.go:1150: I0115 10:35:29.264437] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=<types.UID>: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636"
    dra_manager.go:198: I0115 10:35:29.264448] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=<types.UID>: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 version="287"
    dynamicresources.go:1157: I0115 10:35:29.264463] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-kqjpj/my-pod-0091" claim="testdra-all-usesallresources-kqjpj/claim-0091" uid=<types.UID>: 516f274f-e1a9-4a4b-b7d2-bb86138e4240 resourceVersion="5636" allocation=<
    ...
    allocateddevices.go:189: I0115 10:35:29.267315] scheduler: Observed device allocation device="testdra-all-usesallresources-kqjpj.driver/worker-1/worker-1-device-096" claim="testdra-all-usesallresources-kqjpj/claim-0091"

- goroutine #1: UpdateStatus result delivered via informer.
  AssumeCache updates cache, pushes event A, emitEvents pulls event A from queue.
  *Not* done with delivering it yet!
- goroutine #2: AssumeCache.Assume called. Updates cache, pushes event B, emits it.
  Old and new claim have allocation, so no "Observed device allocation".
- goroutine #3: Schedules next pod, without considering device as allocated (not in the log snippet).
- goroutine #1: Finally delivers event A: "Observed device allocation", but too late.

Also, events are delivered out-of-order.

The fix is to let emitEvents when called by Assume wait for a potentially
running emitEvents in some other goroutine, thus ensuring that an event pulled
out of the queue by that other goroutine got delivered before Assume itself
checks the queue one more time and then returns.

The time window were things go wrong is small. An E2E test covering this only
flaked rarely, and only in the CI. An integration test (separate commit) with
higher number of pods finally made it possible to reproduce locally. It also
uncovered a second race (fix in separate commit).

The unit test fails without the fix:

    === RUN   TestAssumeConcurrency
        assume_cache_test.go:311: FATAL ERROR:
            	Assume should have blocked and didn't.
    --- FAIL: TestAssumeConcurrency (0.00s)
This commit is contained in:
Patrick Ohly
2026-01-16 08:39:02 +01:00
parent 5a6a1fe908
commit f296d4a9a4
2 changed files with 90 additions and 1 deletions

View File

@@ -124,6 +124,9 @@ type AssumeCache struct {
// Synchronizes updates to all fields below.
rwMutex sync.RWMutex
// cond is used by emitEvents.
cond *sync.Cond
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
handlerRegistration cache.ResourceEventHandlerRegistration
@@ -149,6 +152,9 @@ type AssumeCache struct {
// of events would no longer be guaranteed.
eventQueue buffer.Ring[func()]
// emittingEvents is true while one emitEvents call is actively emitting events.
emittingEvents bool
// describes the object stored
description string
@@ -196,6 +202,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam
indexName: indexName,
eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}),
}
c.cond = sync.NewCond(&c.rwMutex)
indexers := cache.Indexers{}
if indexName != "" && indexFunc != nil {
indexers[indexName] = c.objInfoIndexFunc
@@ -508,8 +515,31 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.
}
// emitEvents delivers all pending events that are in the queue, in the order
// in which they were stored there (FIFO).
// in which they were stored there (FIFO). Only one goroutine at a time is
// delivering events, to ensure correct order.
func (c *AssumeCache) emitEvents() {
c.rwMutex.Lock()
for c.emittingEvents {
// Wait for the active caller of emitEvents to finish.
// When it is done, it may or may not have drained
// the events pushed by our caller.
// We'll check below ourselves.
c.cond.Wait()
}
c.emittingEvents = true
c.rwMutex.Unlock()
defer func() {
c.rwMutex.Lock()
c.emittingEvents = false
// Hand over the batton to one other goroutine, if there is one.
// We don't need to wake up more than one because only one of
// them would be able to grab the "emittingEvents" responsibility.
c.cond.Signal()
c.rwMutex.Unlock()
}()
// When we get here, this instance of emitEvents is the active one.
for {
c.rwMutex.Lock()
deliver, ok := c.eventQueue.ReadOne()

View File

@@ -265,6 +265,65 @@ func TestAssume(t *testing.T) {
}
}
// TestAssumeRace simulates this sequence of events:
// - Informer update arrives, event handler gets invoked and is slow.
// - Assume for the same object is called. It must block until
// the informer-triggered event is delivered.
func TestAssumeRace(t *testing.T) { ktesting.Init(t).SyncTest("", testAssumeRace) }
func testAssumeRace(tCtx ktesting.TContext) {
var informer testInformer
testObj := makeObj("pvc1", "1", "")
ac := NewAssumeCache(tCtx.Logger(), &informer, "TestObject", "", nil)
blockEvent := ktesting.WithCancel(tCtx)
defer blockEvent.Cancel("test done")
eventBlocked := false
eventDone := false
ac.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
eventBlocked = true
<-blockEvent.Done()
eventDone = true
},
})
// Here a real client with to a Create. What the assume cache may or may not
// see before Assume is the new object from the informer - let's pretend that
// comes first.
go informer.add(testObj)
// Wait for processing to finish.
tCtx.Wait()
if !eventBlocked {
tCtx.Fatal("Event handler should have been called and wasn't.")
}
// Assume should block until we unblock the event delivery.
assumeDone := false
go func() {
err := ac.Assume(testObj)
if err != nil {
tCtx.Errorf("Assume failed: %v", err)
}
assumeDone = true
}()
// Wait for Assume to be blocked in its implementation.
tCtx.Wait()
if assumeDone {
tCtx.Fatal("Assume should have blocked and didn't.")
}
// Unblock both goroutines.
blockEvent.Cancel("proceed")
tCtx.Wait()
if !eventDone {
tCtx.Fatal("Event should have been delivered and wasn't.")
}
if !assumeDone {
tCtx.Fatal("Assume should have returned and didn't.")
}
}
func TestRestore(t *testing.T) {
tCtx, cache, informer := newTest(t)
var events mockEventHandler