scheduler/dynamicresources: extract obtaining and tracking in-memory modifications of DRA objects

All logic related to obtaining DRA objects and tracking modifications
to ResourceClaims in-memory is extracted to DefaultDRAManager, which
implements framework.SharedDRAManager.

This is intended to be a no-op in terms of the DRA plugin behavior.
This commit is contained in:
Kuba Tużnik
2024-10-07 16:43:27 +02:00
parent 87cd496a29
commit 8d489425aa
6 changed files with 285 additions and 148 deletions

View File

@@ -0,0 +1,226 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"context"
"fmt"
"sync"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
var _ framework.SharedDRAManager = &DefaultDRAManager{}
// DefaultDRAManager is the default implementation of SharedDRAManager. It obtains the DRA objects
// from API informers, and uses an AssumeCache and a map of in-flight allocations in order
// to avoid race conditions when modifying ResourceClaims.
type DefaultDRAManager struct {
resourceClaimTracker *claimTracker
resourceSliceLister *resourceSliceLister
deviceClassLister *deviceClassLister
}
func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, informerFactory informers.SharedInformerFactory) *DefaultDRAManager {
logger := klog.FromContext(ctx)
manager := &DefaultDRAManager{
resourceClaimTracker: &claimTracker{
cache: claimsCache,
inFlightAllocations: &sync.Map{},
allocatedDevices: newAllocatedDevices(logger),
logger: logger,
},
resourceSliceLister: &resourceSliceLister{sliceLister: informerFactory.Resource().V1alpha3().ResourceSlices().Lister()},
deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1alpha3().DeviceClasses().Lister()},
}
// Reacting to events is more efficient than iterating over the list
// repeatedly in PreFilter.
manager.resourceClaimTracker.cache.AddEventHandler(manager.resourceClaimTracker.allocatedDevices.handlers())
return manager
}
func (s *DefaultDRAManager) ResourceClaims() framework.ResourceClaimTracker {
return s.resourceClaimTracker
}
func (s *DefaultDRAManager) ResourceSlices() framework.ResourceSliceLister {
return s.resourceSliceLister
}
func (s *DefaultDRAManager) DeviceClasses() framework.DeviceClassLister {
return s.deviceClassLister
}
var _ framework.ResourceSliceLister = &resourceSliceLister{}
type resourceSliceLister struct {
sliceLister resourcelisters.ResourceSliceLister
}
func (l *resourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) {
return l.sliceLister.List(labels.Everything())
}
var _ framework.DeviceClassLister = &deviceClassLister{}
type deviceClassLister struct {
classLister resourcelisters.DeviceClassLister
}
func (l *deviceClassLister) Get(className string) (*resourceapi.DeviceClass, error) {
return l.classLister.Get(className)
}
func (l *deviceClassLister) List() ([]*resourceapi.DeviceClass, error) {
return l.classLister.List(labels.Everything())
}
var _ framework.ResourceClaimTracker = &claimTracker{}
type claimTracker struct {
// cache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
// update from the apiserver has not been processed by the claim
// informer callbacks. ResourceClaimTracker get added here in PreBind and removed by
// the informer callback (based on the "newer than" comparison in the
// assume cache).
//
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
// therefore are "<namespace>/<name>".
//
// This is necessary to ensure that reconstructing the resource usage
// at the start of a pod scheduling cycle doesn't reuse the resources
// assigned to such a claim. Alternatively, claim allocation state
// could also get tracked across pod scheduling cycles, but that
// - adds complexity (need to carefully sync state with informer events
// for claims and ResourceSlices)
// - would make integration with cluster autoscaler harder because it would need
// to trigger informer callbacks.
cache *assumecache.AssumeCache
// inFlightAllocations is a map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
// corresponding claim status update call in PreBind has not been done
// yet. If another pod needs the claim, the pod is treated as "not
// schedulable yet". The cluster event for the claim status update will
// make it schedulable.
//
// This mechanism avoids the following problem:
// - Pod A triggers allocation for claim X.
// - Pod B shares access to that claim and gets scheduled because
// the claim is assumed to be allocated.
// - PreBind for pod B is called first, tries to update reservedFor and
// fails because the claim is not really allocated yet.
//
// We could avoid the ordering problem by allowing either pod A or pod B
// to set the allocation. But that is more complicated and leads to another
// problem:
// - Pod A and B get scheduled as above.
// - PreBind for pod A gets called first, then fails with a temporary API error.
// It removes the updated claim from the assume cache because of that.
// - PreBind for pod B gets called next and succeeds with adding the
// allocation and its own reservedFor entry.
// - The assume cache is now not reflecting that the claim is allocated,
// which could lead to reusing the same resource for some other claim.
//
// A sync.Map is used because in practice sharing of a claim between
// pods is expected to be rare compared to per-pod claim, so we end up
// hitting the "multiple goroutines read, write, and overwrite entries
// for disjoint sets of keys" case that sync.Map is optimized for.
inFlightAllocations *sync.Map
allocatedDevices *allocatedDevices
logger klog.Logger
}
func (c *claimTracker) ClaimHasPendingAllocation(claimUID types.UID) bool {
_, found := c.inFlightAllocations.Load(claimUID)
return found
}
func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error {
c.inFlightAllocations.Store(claimUID, allocatedClaim)
// There's no reason to return an error in this implementation, but the error is helpful for other implementations.
// For example, implementations that have to deal with fake claims might want to return an error if the allocation
// is for an invalid claim.
return nil
}
func (c *claimTracker) RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool) {
_, found := c.inFlightAllocations.LoadAndDelete(claimUID)
return found
}
func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) {
obj, err := c.cache.Get(namespace + "/" + claimName)
if err != nil {
return nil, err
}
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
return nil, fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, namespace, claimName)
}
return claim, nil
}
func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) {
var result []*resourceapi.ResourceClaim
// Probably not worth adding an index for?
objs := c.cache.List(nil)
for _, obj := range objs {
claim, ok := obj.(*resourceapi.ResourceClaim)
if ok {
result = append(result, claim)
}
}
return result, nil
}
func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := c.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
c.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
})
return true
})
// There's no reason to return an error in this implementation, but the error might be helpful for other implementations.
return allocated, nil
}
func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error {
return c.cache.Assume(claim)
}
func (c *claimTracker) AssumedClaimRestore(namespace, claimName string) {
c.cache.Restore(namespace + "/" + claimName)
}

View File

@@ -30,12 +30,10 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/cel"
@@ -46,7 +44,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
const (
@@ -108,67 +105,10 @@ type DynamicResources struct {
enableAdminAccess bool
enableSchedulingQueueHint bool
fh framework.Handle
clientset kubernetes.Interface
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
celCache *cel.Cache
allocatedDevices *allocatedDevices
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
// update from the apiserver has not been processed by the claim
// informer callbacks. Claims get added here in PreBind and removed by
// the informer callback (based on the "newer than" comparison in the
// assume cache).
//
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
// therefore are "<namespace>/<name>".
//
// This is necessary to ensure that reconstructing the resource usage
// at the start of a pod scheduling cycle doesn't reuse the resources
// assigned to such a claim. Alternatively, claim allocation state
// could also get tracked across pod scheduling cycles, but that
// - adds complexity (need to carefully sync state with informer events
// for claims and ResourceSlices)
// - would make integration with cluster autoscaler harder because it would need
// to trigger informer callbacks.
//
// When implementing cluster autoscaler support, this assume cache or
// something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
// might have to be managed by the cluster autoscaler.
claimAssumeCache *assumecache.AssumeCache
// inFlightAllocations is map from claim UUIDs to claim objects for those claims
// for which allocation was triggered during a scheduling cycle and the
// corresponding claim status update call in PreBind has not been done
// yet. If another pod needs the claim, the pod is treated as "not
// schedulable yet". The cluster event for the claim status update will
// make it schedulable.
//
// This mechanism avoids the following problem:
// - Pod A triggers allocation for claim X.
// - Pod B shares access to that claim and gets scheduled because
// the claim is assumed to be allocated.
// - PreBind for pod B is called first, tries to update reservedFor and
// fails because the claim is not really allocated yet.
//
// We could avoid the ordering problem by allowing either pod A or pod B
// to set the allocation. But that is more complicated and leads to another
// problem:
// - Pod A and B get scheduled as above.
// - PreBind for pod A gets called first, then fails with a temporary API error.
// It removes the updated claim from the assume cache because of that.
// - PreBind for pod B gets called next and succeeds with adding the
// allocation and its own reservedFor entry.
// - The assume cache is now not reflecting that the claim is allocated,
// which could lead to reusing the same resource for some other claim.
//
// A sync.Map is used because in practice sharing of a claim between
// pods is expected to be rare compared to per-pod claim, so we end up
// hitting the "multiple goroutines read, write, and overwrite entries
// for disjoint sets of keys" case that sync.Map is optimized for.
inFlightAllocations sync.Map
fh framework.Handle
clientset kubernetes.Interface
celCache *cel.Cache
draManager framework.SharedDRAManager
}
// New initializes a new plugin and returns it.
@@ -178,30 +118,20 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &DynamicResources{}, nil
}
logger := klog.FromContext(ctx)
pl := &DynamicResources{
enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess,
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
fh: fh,
clientset: fh.ClientSet(),
classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(),
sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(),
claimAssumeCache: fh.ResourceClaimCache(),
fh: fh,
clientset: fh.ClientSet(),
// This is a LRU cache for compiled CEL expressions. The most
// recent 10 of them get reused across different scheduling
// cycles.
celCache: cel.NewCache(10),
allocatedDevices: newAllocatedDevices(logger),
celCache: cel.NewCache(10),
draManager: fh.SharedDRAManager(),
}
// Reacting to events is more efficient than iterating over the list
// repeatedly in PreFilter.
pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers())
return pl, nil
}
@@ -419,16 +349,11 @@ func (pl *DynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso
if claimName == nil {
continue
}
obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName)
claim, err := pl.draManager.ResourceClaims().Get(pod.Namespace, *claimName)
if err != nil {
return err
}
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName)
}
if claim.DeletionTimestamp != nil {
return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
}
@@ -499,7 +424,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Allocation in flight? Better wait for that
// to finish, see inFlightAllocations
// documentation for details.
if _, found := pl.inFlightAllocations.Load(claim.UID); found {
if pl.draManager.ResourceClaims().ClaimHasPendingAllocation(claim.UID) {
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
}
@@ -516,7 +441,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, statusError(logger, fmt.Errorf("request %s: unsupported request type", request.Name))
}
_, err := pl.classLister.Get(request.DeviceClassName)
_, err := pl.draManager.DeviceClasses().Get(request.DeviceClassName)
if err != nil {
// If the class cannot be retrieved, allocation cannot proceed.
if apierrors.IsNotFound(err) {
@@ -546,12 +471,15 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Claims (and thus their devices) are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
allAllocatedDevices := pl.listAllAllocatedDevices(logger)
slices, err := pl.sliceLister.List(labels.Everything())
allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices()
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache)
slices, err := pl.draManager.ResourceSlices().List()
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
if err != nil {
return nil, statusError(logger, err)
}
@@ -563,23 +491,6 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, nil
}
func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] {
// Start with a fresh set that matches the current known state of the
// world according to the informers.
allocated := pl.allocatedDevices.Get()
// Whatever is in flight also has to be checked.
pl.inFlightAllocations.Range(func(key, value any) bool {
claim := value.(*resourceapi.ResourceClaim)
foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) {
logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim))
allocated.Insert(deviceID)
})
return true
})
return allocated
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *DynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
return nil
@@ -792,7 +703,10 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer)
}
claim.Status.Allocation = allocation
pl.inFlightAllocations.Store(claim.UID, claim)
err := pl.draManager.ResourceClaims().SignalClaimPendingAllocation(claim.UID, claim)
if err != nil {
return statusError(logger, fmt.Errorf("internal error, couldn't signal allocation for claim %s", claim.Name))
}
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "allocation", klog.Format(allocation))
}
}
@@ -819,8 +733,8 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
for index, claim := range state.claims {
// If allocation was in-flight, then it's not anymore and we need to revert the
// claim object in the assume cache to what it was before.
if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found {
pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims[index].UID); deleted {
pl.draManager.ResourceClaims().AssumedClaimRestore(claim.Namespace, claim.Name)
}
if claim.Status.Allocation != nil &&
@@ -892,11 +806,11 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
if finalErr == nil {
// This can fail, but only for reasons that are okay (concurrent delete or update).
// Shouldn't happen in this case.
if err := pl.claimAssumeCache.Assume(claim); err != nil {
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
}
}
pl.inFlightAllocations.Delete(claim.UID)
pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(claim.UID)
}
}()

View File

@@ -916,13 +916,13 @@ func TestPlugin(t *testing.T) {
}
type testContext struct {
ctx context.Context
client *fake.Clientset
informerFactory informers.SharedInformerFactory
claimAssumeCache *assumecache.AssumeCache
p *DynamicResources
nodeInfos []*framework.NodeInfo
state *framework.CycleState
ctx context.Context
client *fake.Clientset
informerFactory informers.SharedInformerFactory
draManager *DefaultDRAManager
p *DynamicResources
nodeInfos []*framework.NodeInfo
state *framework.CycleState
}
func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) {
@@ -984,14 +984,11 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
}
func (tc *testContext) listAssumedClaims() []metav1.Object {
if tc.p.claimAssumeCache == nil {
return nil
}
var assumedClaims []metav1.Object
for _, obj := range tc.p.claimAssumeCache.List(nil) {
for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) {
claim := obj.(*resourceapi.ResourceClaim)
obj, _ := tc.p.claimAssumeCache.Get(claim.Namespace + "/" + claim.Name)
apiObj, _ := tc.p.claimAssumeCache.GetAPIObj(claim.Namespace + "/" + claim.Name)
obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name)
apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name)
if obj != apiObj {
assumedClaims = append(assumedClaims, claim)
}
@@ -1002,7 +999,7 @@ func (tc *testContext) listAssumedClaims() []metav1.Object {
func (tc *testContext) listInFlightClaims() []metav1.Object {
var inFlightClaims []metav1.Object
tc.p.inFlightAllocations.Range(func(key, value any) bool {
tc.draManager.resourceClaimTracker.inFlightAllocations.Range(func(key, value any) bool {
inFlightClaims = append(inFlightClaims, value.(*resourceapi.ResourceClaim))
return true
})
@@ -1072,11 +1069,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim,
tc.client.PrependReactor("*", "*", reactor)
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha3().ResourceClaims().Informer(), "resource claim", "", nil)
tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha3().ResourceClaims().Informer(), "resource claim", "", nil), tc.informerFactory)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(tc.claimAssumeCache),
runtime.WithSharedDRAManager(tc.draManager),
}
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil {
@@ -1290,7 +1287,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
}
claim = storedClaim
} else {
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
cachedClaim, err := testCtx.draManager.resourceClaimTracker.cache.Get(claimKey)
if err != nil {
t.Fatalf("retrieve old claim: expected no error, got: %v", err)
}
@@ -1308,7 +1305,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) {
// Eventually the assume cache will have it, too.
require.EventuallyWithT(t, func(t *assert.CollectT) {
cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey)
cachedClaim, err := testCtx.draManager.resourceClaimTracker.cache.Get(claimKey)
require.NoError(t, err, "retrieve claim")
if cachedClaim.(*resourceapi.ResourceClaim).ResourceVersion != claim.ResourceVersion {
t.Errorf("cached claim not updated yet")

View File

@@ -26,13 +26,19 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
type deviceClassLister interface {
// List returns a list of all DeviceClasses.
List() ([]*resourceapi.DeviceClass, error)
// Get returns the DeviceClass with the given className.
Get(className string) (*resourceapi.DeviceClass, error)
}
// Allocator calculates how to allocate a set of unallocated claims which use
// structured parameters.
//
@@ -43,7 +49,7 @@ type Allocator struct {
adminAccessEnabled bool
claimsToAllocate []*resourceapi.ResourceClaim
allocatedDevices sets.Set[DeviceID]
classLister resourcelisters.DeviceClassLister
classLister deviceClassLister
slices []*resourceapi.ResourceSlice
celCache *cel.Cache
}
@@ -56,7 +62,7 @@ func NewAllocator(ctx context.Context,
adminAccessEnabled bool,
claimsToAllocate []*resourceapi.ResourceClaim,
allocatedDevices sets.Set[DeviceID],
classLister resourcelisters.DeviceClassLister,
classLister deviceClassLister,
slices []*resourceapi.ResourceSlice,
celCache *cel.Cache,
) (*Allocator, error) {

View File

@@ -17,7 +17,6 @@ limitations under the License.
package structured
import (
"errors"
"flag"
"fmt"
"slices"
@@ -33,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
@@ -1402,10 +1400,7 @@ type informerLister[T any] struct {
err error
}
func (l informerLister[T]) List(selector labels.Selector) (ret []*T, err error) {
if selector.String() != labels.Everything().String() {
return nil, errors.New("labels selector not implemented")
}
func (l informerLister[T]) List() (ret []*T, err error) {
return l.objs, l.err
}

View File

@@ -37,6 +37,7 @@ import (
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
@@ -275,10 +276,8 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
// Track cluster state.
informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0)
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister()
sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil)
draManager := dynamicresources.NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil), informerFactory)
informerFactory.Start(tCtx.Done())
defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
@@ -297,7 +296,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes")
slices, err := sliceLister.List(labels.Everything())
slices, err := draManager.ResourceSlices().List()
tCtx.ExpectNoError(err, "list slices")
// Allocate one claim at a time, picking nodes randomly. Each
@@ -310,10 +309,10 @@ claims:
continue
}
objs := claimCache.List(nil)
claims, err := draManager.ResourceClaims().List()
tCtx.ExpectNoError(err, "list claims")
allocatedDevices := sets.New[structured.DeviceID]()
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
for _, claim := range claims {
if claim.Status.Allocation == nil {
continue
}
@@ -322,7 +321,7 @@ claims:
}
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache)
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {
@@ -336,10 +335,10 @@ claims:
claim.Status.Allocation = &result[0]
claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update claim status with allocation")
tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim")
tCtx.ExpectNoError(draManager.ResourceClaims().AssumeClaimAfterAPICall(claim), "assume claim")
continue claims
}
}
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items))
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims))
}
}