Merge pull request #136565 from pohly/automated-cherry-pick-of-#136269-origin-release-1.33

Automated cherry pick of #136269: DRA scheduler: double allocation fixes
This commit is contained in:
Kubernetes Prow Robot
2026-02-05 19:32:39 +05:30
committed by GitHub
4 changed files with 103 additions and 10 deletions

View File

@@ -62,12 +62,19 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, cb func(deviceID s
// This is cheaper than repeatedly calling List, making strings unique, and building the set
// each time PreFilter is called.
//
// To simplify detecting concurrent changes, each modification bumps a revision counter,
// similar to ResourceVersion in the apiserver. Get and Capacities include the
// current value in their result. A caller than can compare againt the current value
// to determine whether some prior results are still up-to-date, without having to get
// and compare them.
//
// All methods are thread-safe. Get returns a cloned set.
type allocatedDevices struct {
logger klog.Logger
mutex sync.RWMutex
ids sets.Set[structured.DeviceID]
mutex sync.RWMutex
revision int64
ids sets.Set[structured.DeviceID]
}
func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
@@ -77,11 +84,18 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices {
}
}
func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] {
func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.ids.Clone()
return a.ids.Clone(), a.revision
}
func (a *allocatedDevices) Revision() int64 {
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.revision
}
func (a *allocatedDevices) handlers() cache.ResourceEventHandler {
@@ -147,8 +161,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) {
deviceIDs = append(deviceIDs, deviceID)
})
if len(deviceIDs) == 0 {
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.revision++
for _, deviceID := range deviceIDs {
a.ids.Insert(deviceID)
}
@@ -169,6 +188,7 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.revision++
for _, deviceID := range deviceIDs {
a.ids.Delete(deviceID)
}

View File

@@ -18,6 +18,7 @@ package dynamicresources
import (
"context"
"errors"
"fmt"
"sync"
@@ -201,10 +202,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) {
return result, nil
}
// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices
// or GatherAllocatedState need to be retried.
//
// There is a rare race when a claim is initially in-flight:
// - allocated is created from cache (claim not there)
// - someone removes from the in-flight claims and adds to the cache
// - we start checking in-flight claims (claim not there anymore)
// => claim ignored
//
// A proper fix would be to rewrite the assume cache, allocatedDevices,
// and the in-flight map so that they are under a single lock. But that's
// a pretty big change and prevents reusing the assume cache. So instead
// we check for changes in the set of allocated devices and keep trying
// until we get an attempt with no concurrent changes.
//
// A claim being first in the cache, then only in-flight cannot happen,
// so we don't need to re-check the in-flight claims.
var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification")
func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := c.allocatedDevices.Get()
allocated, revision := c.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
@@ -215,8 +235,13 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID],
})
return true
})
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
return allocated, nil
if revision == c.allocatedDevices.Revision() {
// Our current result is valid, nothing changed in the meantime.
return allocated, nil
}
return nil, errClaimTrackerConcurrentModification
}
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {

View File

@@ -23,6 +23,7 @@ import (
"slices"
"strings"
"sync"
"time"
"github.com/google/go-cmp/cmp" //nolint:depguard
@@ -34,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
@@ -448,9 +450,25 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
//
// This might have to be retried in the unlikely case that some concurrent modification made
// the result invalid.
var allAllocatedDevices sets.Set[structured.DeviceID]
err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) {
ad, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
if err != nil {
if errors.Is(err, errClaimTrackerConcurrentModification) {
logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again")
return false, nil
}
return false, err
}
// Done.
allAllocatedDevices = ad
return true, nil
})
if err != nil {
return nil, statusError(logger, err)
return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err))
}
slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
if err != nil {

View File

@@ -124,6 +124,9 @@ type AssumeCache struct {
// Synchronizes updates to all fields below.
rwMutex sync.RWMutex
// cond is used by emitEvents.
cond *sync.Cond
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
handlerRegistration cache.ResourceEventHandlerRegistration
@@ -149,6 +152,9 @@ type AssumeCache struct {
// of events would no longer be guaranteed.
eventQueue queue.FIFO[func()]
// emittingEvents is true while one emitEvents call is actively emitting events.
emittingEvents bool
// describes the object stored
description string
@@ -195,6 +201,7 @@ func NewAssumeCache(logger klog.Logger, informer Informer, description, indexNam
indexFunc: indexFunc,
indexName: indexName,
}
c.cond = sync.NewCond(&c.rwMutex)
indexers := cache.Indexers{}
if indexName != "" && indexFunc != nil {
indexers[indexName] = c.objInfoIndexFunc
@@ -507,8 +514,31 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.
}
// emitEvents delivers all pending events that are in the queue, in the order
// in which they were stored there (FIFO).
// in which they were stored there (FIFO). Only one goroutine at a time is
// delivering events, to ensure correct order.
func (c *AssumeCache) emitEvents() {
c.rwMutex.Lock()
for c.emittingEvents {
// Wait for the active caller of emitEvents to finish.
// When it is done, it may or may not have drained
// the events pushed by our caller.
// We'll check below ourselves.
c.cond.Wait()
}
c.emittingEvents = true
c.rwMutex.Unlock()
defer func() {
c.rwMutex.Lock()
c.emittingEvents = false
// Hand over the batton to one other goroutine, if there is one.
// We don't need to wake up more than one because only one of
// them would be able to grab the "emittingEvents" responsibility.
c.cond.Signal()
c.rwMutex.Unlock()
}()
// When we get here, this instance of emitEvents is the active one.
for {
c.rwMutex.Lock()
deliver, ok := c.eventQueue.Pop()