diff --git a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go index 24d6f8c97c3..21d192ffc89 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go @@ -85,11 +85,18 @@ func foreachAllocatedDevice(claim *resourceapi.ResourceClaim, // This is cheaper than repeatedly calling List, making strings unique, and building the set // each time PreFilter is called. // +// To simplify detecting concurrent changes, each modification bumps a revision counter, +// similar to ResourceVersion in the apiserver. Get and Capacities include the +// current value in their result. A caller than can compare againt the current value +// to determine whether some prior results are still up-to-date, without having to get +// and compare them. +// // All methods are thread-safe. Get returns a cloned set. type allocatedDevices struct { logger klog.Logger mutex sync.RWMutex + revision int64 ids sets.Set[structured.DeviceID] shareIDs sets.Set[structured.SharedDeviceID] capacities structured.ConsumedCapacityCollection @@ -106,18 +113,25 @@ func newAllocatedDevices(logger klog.Logger) *allocatedDevices { } } -func (a *allocatedDevices) Get() sets.Set[structured.DeviceID] { +func (a *allocatedDevices) Get() (sets.Set[structured.DeviceID], int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.ids.Clone() + return a.ids.Clone(), a.revision } -func (a *allocatedDevices) Capacities() structured.ConsumedCapacityCollection { +func (a *allocatedDevices) Capacities() (structured.ConsumedCapacityCollection, int64) { a.mutex.RLock() defer a.mutex.RUnlock() - return a.capacities.Clone() + return a.capacities.Clone(), a.revision +} + +func (a *allocatedDevices) Revision() int64 { + a.mutex.RLock() + defer a.mutex.RUnlock() + + return a.revision } func (a *allocatedDevices) handlers() cache.ResourceEventHandler { @@ -200,8 +214,13 @@ func (a *allocatedDevices) addDevices(claim *resourceapi.ResourceClaim) { }, ) + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Insert(deviceID) } @@ -241,8 +260,14 @@ func (a *allocatedDevices) removeDevices(claim *resourceapi.ResourceClaim) { a.logger.V(6).Info("Observed consumed capacity release", "device id", capacity.DeviceID, "consumed capacity", capacity.ConsumedCapacity, "claim", klog.KObj(claim)) deviceCapacities = append(deviceCapacities, capacity) }) + + if len(deviceIDs) == 0 && len(shareIDs) == 0 && len(deviceCapacities) == 0 { + return + } + a.mutex.Lock() defer a.mutex.Unlock() + a.revision++ for _, deviceID := range deviceIDs { a.ids.Delete(deviceID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go index 42b54c08e1f..2be14d10ec6 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -18,6 +18,7 @@ package dynamicresources import ( "context" + "errors" "fmt" "sync" @@ -218,10 +219,29 @@ func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) { return result, nil } +// errClaimTrackerConcurrentModification gets returned if ListAllAllocatedDevices +// or GatherAllocatedState need to be retried. +// +// There is a rare race when a claim is initially in-flight: +// - allocated is created from cache (claim not there) +// - someone removes from the in-flight claims and adds to the cache +// - we start checking in-flight claims (claim not there anymore) +// => claim ignored +// +// A proper fix would be to rewrite the assume cache, allocatedDevices, +// and the in-flight map so that they are under a single lock. But that's +// a pretty big change and prevents reusing the assume cache. So instead +// we check for changes in the set of allocated devices and keep trying +// until we get an attempt with no concurrent changes. +// +// A claim being first in the cache, then only in-flight cannot happen, +// so we don't need to re-check the in-flight claims. +var errClaimTrackerConcurrentModification = errors.New("conflicting concurrent modification") + func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision := c.allocatedDevices.Get() // Whatever is in flight also has to be checked. c.inFlightAllocations.Range(func(key, value any) bool { @@ -232,16 +252,26 @@ func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], }, false, func(structured.SharedDeviceID) {}, func(structured.DeviceConsumedCapacity) {}) return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return allocated, nil + + if revision == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return allocated, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error) { // Start with a fresh set that matches the current known state of the // world according to the informers. - allocated := c.allocatedDevices.Get() + allocated, revision1 := c.allocatedDevices.Get() allocatedSharedDeviceIDs := sets.New[structured.SharedDeviceID]() - aggregatedCapacity := c.allocatedDevices.Capacities() + aggregatedCapacity, revision2 := c.allocatedDevices.Capacities() + + if revision1 != revision2 { + // Already not consistent. Try again. + return nil, errClaimTrackerConcurrentModification + } enabledConsumableCapacity := utilfeature.DefaultFeatureGate.Enabled(features.DRAConsumableCapacity) @@ -263,12 +293,16 @@ func (c *claimTracker) GatherAllocatedState() (*structured.AllocatedState, error return true }) - // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. - return &structured.AllocatedState{ - AllocatedDevices: allocated, - AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, - AggregatedCapacity: aggregatedCapacity, - }, nil + if revision1 == c.allocatedDevices.Revision() { + // Our current result is valid, nothing changed in the meantime. + return &structured.AllocatedState{ + AllocatedDevices: allocated, + AllocatedSharedDeviceIDs: allocatedSharedDeviceIDs, + AggregatedCapacity: aggregatedCapacity, + }, nil + } + + return nil, errClaimTrackerConcurrentModification } func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index b92a3e7b057..75551dc9805 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -518,25 +518,45 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state fwk.CycleState, // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. + // + // This might have to be retried in the unlikely case that some concurrent modification made + // the result invalid. var allocatedState *structured.AllocatedState - if pl.fts.EnableDRAConsumableCapacity { - allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() - if err != nil { - return nil, statusError(logger, err) - } - if allocatedState == nil { - return nil, statusError(logger, errors.New("nil allocated state")) - } - } else { - allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() - if err != nil { - return nil, statusError(logger, err) - } - allocatedState = &structured.AllocatedState{ - AllocatedDevices: allocatedDevices, - AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), - AggregatedCapacity: structured.NewConsumedCapacityCollection(), + err = wait.PollUntilContextTimeout(ctx, time.Microsecond, 5*time.Second, true /* immediate */, func(context.Context) (bool, error) { + if pl.fts.EnableDRAConsumableCapacity { + allocatedState, err = pl.draManager.ResourceClaims().GatherAllocatedState() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during GatherAllocatedState, trying again") + return false, nil + } + return false, err + } + if allocatedState == nil { + return false, errors.New("nil allocated state") + } + // Done. + return true, nil + } else { + allocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() + if err != nil { + if errors.Is(err, errClaimTrackerConcurrentModification) { + logger.V(6).Info("Conflicting modification during ListAllAllocatedDevices, trying again") + return false, nil + } + return false, err + } + allocatedState = &structured.AllocatedState{ + AllocatedDevices: allocatedDevices, + AllocatedSharedDeviceIDs: sets.New[structured.SharedDeviceID](), + AggregatedCapacity: structured.NewConsumedCapacityCollection(), + } + // Done. + return true, nil } + }) + if err != nil { + return nil, statusError(logger, fmt.Errorf("gather allocation state: %w", err)) } slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules() if err != nil {