DRA scheduler: fix another root cause of double device allocation

GatherAllocatedState and ListAllAllocatedDevices need to collect information
from different sources (allocated devices, in-flight claims), potentially even
multiple times (GatherAllocatedState first gets allocated devices, then the
capacities).

The underlying assumption that nothing bad happens in parallel is not always
true. The following log snippet shows how an update of the assume
cache (feeding the allocated devices tracker) and in-flight claims lands such
that GatherAllocatedState doesn't see the device in that claim as allocated:

    dra_manager.go:263: I0115 15:11:04.407714      18778] scheduler: Starting GatherAllocatedState
    ...
    allocateddevices.go:189: I0115 15:11:04.407945      18066] scheduler: Observed device allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-094" claim="testdra-all-usesallresources-hvs5d/claim-0553"
    dynamicresources.go:1150: I0115 15:11:04.407981      89109] scheduler: Claim stored in assume cache pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=<types.UID>: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680"
    dra_manager.go:201: I0115 15:11:04.408008      89109] scheduler: Removed in-flight claim claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=<types.UID>: a84d3c4d-f752-4cfd-8993-f4ce58643685 version="1211"
    dynamicresources.go:1157: I0115 15:11:04.408044      89109] scheduler: Removed claim from in-flight claims pod="testdra-all-usesallresources-hvs5d/my-pod-0553" claim="testdra-all-usesallresources-hvs5d/claim-0553" uid=<types.UID>: a84d3c4d-f752-4cfd-8993-f4ce58643685 resourceVersion="5680" allocation=<
        	{
        	  "devices": {
        	    "results": [
        	      {
        	        "request": "req-1",
        	        "driver": "testdra-all-usesallresources-hvs5d.driver",
        	        "pool": "worker-5",
        	        "device": "worker-5-device-094"
        	      }
        	    ]
        	  },
        	  "nodeSelector": {
        	    "nodeSelectorTerms": [
        	      {
        	        "matchFields": [
        	          {
        	            "key": "metadata.name",
        	            "operator": "In",
        	            "values": [
        	              "worker-5"
        	            ]
        	          }
        	        ]
        	      }
        	    ]
        	  },
        	  "allocationTimestamp": "2026-01-15T14:11:04Z"
        	}
         >
    dra_manager.go:280: I0115 15:11:04.408085      18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-095" claim="testdra-all-usesallresources-hvs5d/claim-0086"
    dra_manager.go:280: I0115 15:11:04.408137      18778] scheduler: Device is in flight for allocation device="testdra-all-usesallresources-hvs5d.driver/worker-5/worker-5-device-096" claim="testdra-all-usesallresources-hvs5d/claim-0165"
    default_binder.go:69: I0115 15:11:04.408175      89109] scheduler: Attempting to bind pod to node pod="testdra-all-usesallresources-hvs5d/my-pod-0553" node="worker-5"
    dra_manager.go:265: I0115 15:11:04.408264      18778] scheduler: Finished GatherAllocatedState allocatedDevices=<map[string]interface {} | len:2>: {

Initial state: "worker-5-device-094" is in-flight, not in cache
- goroutine #1: starts GatherAllocatedState, copies cache
- goroutine #2: adds to assume cache, removes from in-flight
- goroutine #1: checks in-flight

=> device never seen as allocated

This is the second reason for double allocation of the same device in two
different claims. The other was timing in the assume cache. Both were
tracked down with an integration test (separate commit). It did not fail
all the time, but enough that regressions should show up as flakes.
This commit is contained in:
Patrick Ohly
2026-01-16 07:51:14 +01:00
parent 75fd186e7a
commit ba81d3040d
3 changed files with 72 additions and 9 deletions

View File

@@ -62,12 +62,19 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
// This is cheaper than repeatedly calling List, making strings unique, and building the set
// each time PreFilter is called.
//
// To simplify detecting concurrent changes, each modification bumps a revision counter,
// similar to ResourceVersion in the apiserver. Get and Capacities include the
// current value in their result. A caller than can compare againt the current value
// to determine whether some prior results are still up-to-date, without having to get
// and compare them.
//
// All methods are thread-safe. Get returns a cloned set.
type allocatedDevices struct {
logger klog.Logger
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
mutex sync.RWMutex
revision int64
ids sets.Set[structured.DeviceID]
}
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
@@ -77,11 +84,18 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
}
}
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.ids.Clone()
return a.ids.Clone(), a.revision
}
func (a *allocatedDevices) Revision() int64 {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.revision
}
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
@@ -147,8 +161,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
deviceIDs = append(deviceIDs, deviceID)
})
if len(deviceIDs) == 0 {
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.revision++
for _, deviceID := range deviceIDs {
a.ids.Insert(deviceID)
}
@@ -169,6 +188,7 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.revision++
for _, deviceID := range deviceIDs {
a.ids.Delete(deviceID)
}

View File

@@ -18,6 +18,7 @@ package dynamicresources
import (
"context"
"errors"
"fmt"
"sync"
@@ -201,10 +202,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) {
return result, nil
}
// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices
// or GatherAllocatedState need to be retried.
//
// There is a rare race when a claim is initially in-flight:
// - allocated is created from cache (claim not there)
// - someone removes from the in-flight claims and adds to the cache
// - we start checking in-flight claims (claim not there anymore)
// => claim ignored
//
// A proper fix would be to rewrite the assume cache, allocatedDevices,
// and the in-flight map so that they are under a single lock. But that's
// a pretty big change and prevents reusing the assume cache. So instead
// we check for changes in the set of allocated devices and keep trying
// until we get an attempt with no concurrent changes.
//
// A claim being first in the cache, then only in-flight cannot happen,
// so we don't need to re-check the in-flight claims.
var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification")
func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := c.allocatedDevices.Get()
allocated, revision := c.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
@@ -215,8 +235,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID],
})
return true
})
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
return allocated, nil
if revision == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
return allocated, nil
}
return nil, errClaimTrackerConcurrentModification
}
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {

View File

@@ -23,6 +23,7 @@ import (
"slices"
"strings"
"sync"
"time"
"github.com/google/go-cmp/cmp" //nolint:depguard
@@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
@@ -448,9 +450,25 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
//
// This might have to be retried in the unlikely case that some concurrent modification made
// the result invalid.
var allAllocatedDevices sets.Set[structured.DeviceID]
err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) {
ad, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
if err != nil {
if errors.Is(err, errClaimTrackerConcurrentModification) {
logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again")
return false, nil
}
return false, err
}
// Done.
allAllocatedDevices = ad
return true, nil
})
if err != nil {
return nil, statusError(logger, err)
return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err))
}
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
if err != nil {