diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go new file mode 100644 index 00000000000..4ee01e731a4 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go @@ -0,0 +1,226 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "fmt" + "sync" + + resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" + "k8s.io/dynamic-resource-allocation/structured" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" +) + +var _ framework.SharedDRAManager = &DefaultDRAManager{} + +// DefaultDRAManager is the default implementation of SharedDRAManager. It obtains the DRA objects +// from API informers, and uses an AssumeCache and a map of in-flight allocations in order +// to avoid race conditions when modifying ResourceClaims. +type DefaultDRAManager struct { + resourceClaimTracker *claimTracker + resourceSliceLister *resourceSliceLister + deviceClassLister *deviceClassLister +} + +func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, informerFactory informers.SharedInformerFactory) *DefaultDRAManager { + logger := klog.FromContext(ctx) + manager := &DefaultDRAManager{ + resourceClaimTracker: &claimTracker{ + cache: claimsCache, + inFlightAllocations: &sync.Map{}, + allocatedDevices: newAllocatedDevices(logger), + logger: logger, + }, + resourceSliceLister: &resourceSliceLister{sliceLister: informerFactory.Resource().V1alpha3().ResourceSlices().Lister()}, + deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1alpha3().DeviceClasses().Lister()}, + } + + // Reacting to events is more efficient than iterating over the list + // repeatedly in PreFilter. + manager.resourceClaimTracker.cache.AddEventHandler(manager.resourceClaimTracker.allocatedDevices.handlers()) + + return manager +} + +func (s *DefaultDRAManager) ResourceClaims() framework.ResourceClaimTracker { + return s.resourceClaimTracker +} + +func (s *DefaultDRAManager) ResourceSlices() framework.ResourceSliceLister { + return s.resourceSliceLister +} + +func (s *DefaultDRAManager) DeviceClasses() framework.DeviceClassLister { + return s.deviceClassLister +} + +var _ framework.ResourceSliceLister = &resourceSliceLister{} + +type resourceSliceLister struct { + sliceLister resourcelisters.ResourceSliceLister +} + +func (l *resourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) { + return l.sliceLister.List(labels.Everything()) +} + +var _ framework.DeviceClassLister = &deviceClassLister{} + +type deviceClassLister struct { + classLister resourcelisters.DeviceClassLister +} + +func (l *deviceClassLister) Get(className string) (*resourceapi.DeviceClass, error) { + return l.classLister.Get(className) +} + +func (l *deviceClassLister) List() ([]*resourceapi.DeviceClass, error) { + return l.classLister.List(labels.Everything()) +} + +var _ framework.ResourceClaimTracker = &claimTracker{} + +type claimTracker struct { + // cache enables temporarily storing a newer claim object + // while the scheduler has allocated it and the corresponding object + // update from the apiserver has not been processed by the claim + // informer callbacks. ResourceClaimTracker get added here in PreBind and removed by + // the informer callback (based on the "newer than" comparison in the + // assume cache). + // + // It uses cache.MetaNamespaceKeyFunc to generate object names, which + // therefore are "/". + // + // This is necessary to ensure that reconstructing the resource usage + // at the start of a pod scheduling cycle doesn't reuse the resources + // assigned to such a claim. Alternatively, claim allocation state + // could also get tracked across pod scheduling cycles, but that + // - adds complexity (need to carefully sync state with informer events + // for claims and ResourceSlices) + // - would make integration with cluster autoscaler harder because it would need + // to trigger informer callbacks. + cache *assumecache.AssumeCache + // inFlightAllocations is a map from claim UUIDs to claim objects for those claims + // for which allocation was triggered during a scheduling cycle and the + // corresponding claim status update call in PreBind has not been done + // yet. If another pod needs the claim, the pod is treated as "not + // schedulable yet". The cluster event for the claim status update will + // make it schedulable. + // + // This mechanism avoids the following problem: + // - Pod A triggers allocation for claim X. + // - Pod B shares access to that claim and gets scheduled because + // the claim is assumed to be allocated. + // - PreBind for pod B is called first, tries to update reservedFor and + // fails because the claim is not really allocated yet. + // + // We could avoid the ordering problem by allowing either pod A or pod B + // to set the allocation. But that is more complicated and leads to another + // problem: + // - Pod A and B get scheduled as above. + // - PreBind for pod A gets called first, then fails with a temporary API error. + // It removes the updated claim from the assume cache because of that. + // - PreBind for pod B gets called next and succeeds with adding the + // allocation and its own reservedFor entry. + // - The assume cache is now not reflecting that the claim is allocated, + // which could lead to reusing the same resource for some other claim. + // + // A sync.Map is used because in practice sharing of a claim between + // pods is expected to be rare compared to per-pod claim, so we end up + // hitting the "multiple goroutines read, write, and overwrite entries + // for disjoint sets of keys" case that sync.Map is optimized for. + inFlightAllocations *sync.Map + allocatedDevices *allocatedDevices + logger klog.Logger +} + +func (c *claimTracker) ClaimHasPendingAllocation(claimUID types.UID) bool { + _, found := c.inFlightAllocations.Load(claimUID) + return found +} + +func (c *claimTracker) SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error { + c.inFlightAllocations.Store(claimUID, allocatedClaim) + // There's no reason to return an error in this implementation, but the error is helpful for other implementations. + // For example, implementations that have to deal with fake claims might want to return an error if the allocation + // is for an invalid claim. + return nil +} + +func (c *claimTracker) RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool) { + _, found := c.inFlightAllocations.LoadAndDelete(claimUID) + return found +} + +func (c *claimTracker) Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) { + obj, err := c.cache.Get(namespace + "/" + claimName) + if err != nil { + return nil, err + } + claim, ok := obj.(*resourceapi.ResourceClaim) + if !ok { + return nil, fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, namespace, claimName) + } + return claim, nil +} + +func (c *claimTracker) List() ([]*resourceapi.ResourceClaim, error) { + var result []*resourceapi.ResourceClaim + // Probably not worth adding an index for? + objs := c.cache.List(nil) + for _, obj := range objs { + claim, ok := obj.(*resourceapi.ResourceClaim) + if ok { + result = append(result, claim) + } + } + return result, nil +} + +func (c *claimTracker) ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) { + // Start with a fresh set that matches the current known state of the + // world according to the informers. + allocated := c.allocatedDevices.Get() + + // Whatever is in flight also has to be checked. + c.inFlightAllocations.Range(func(key, value any) bool { + claim := value.(*resourceapi.ResourceClaim) + foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { + c.logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) + allocated.Insert(deviceID) + }) + return true + }) + // There's no reason to return an error in this implementation, but the error might be helpful for other implementations. + return allocated, nil +} + +func (c *claimTracker) AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error { + return c.cache.Assume(claim) +} + +func (c *claimTracker) AssumedClaimRestore(namespace, claimName string) { + c.cache.Restore(namespace + "/" + claimName) +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 0427e6deb08..898ad69b635 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -30,12 +30,10 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" - resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/client-go/util/retry" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/dynamic-resource-allocation/cel" @@ -46,7 +44,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" schedutil "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) const ( @@ -108,67 +105,10 @@ type DynamicResources struct { enableAdminAccess bool enableSchedulingQueueHint bool - fh framework.Handle - clientset kubernetes.Interface - classLister resourcelisters.DeviceClassLister - sliceLister resourcelisters.ResourceSliceLister - celCache *cel.Cache - allocatedDevices *allocatedDevices - - // claimAssumeCache enables temporarily storing a newer claim object - // while the scheduler has allocated it and the corresponding object - // update from the apiserver has not been processed by the claim - // informer callbacks. Claims get added here in PreBind and removed by - // the informer callback (based on the "newer than" comparison in the - // assume cache). - // - // It uses cache.MetaNamespaceKeyFunc to generate object names, which - // therefore are "/". - // - // This is necessary to ensure that reconstructing the resource usage - // at the start of a pod scheduling cycle doesn't reuse the resources - // assigned to such a claim. Alternatively, claim allocation state - // could also get tracked across pod scheduling cycles, but that - // - adds complexity (need to carefully sync state with informer events - // for claims and ResourceSlices) - // - would make integration with cluster autoscaler harder because it would need - // to trigger informer callbacks. - // - // When implementing cluster autoscaler support, this assume cache or - // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) - // might have to be managed by the cluster autoscaler. - claimAssumeCache *assumecache.AssumeCache - - // inFlightAllocations is map from claim UUIDs to claim objects for those claims - // for which allocation was triggered during a scheduling cycle and the - // corresponding claim status update call in PreBind has not been done - // yet. If another pod needs the claim, the pod is treated as "not - // schedulable yet". The cluster event for the claim status update will - // make it schedulable. - // - // This mechanism avoids the following problem: - // - Pod A triggers allocation for claim X. - // - Pod B shares access to that claim and gets scheduled because - // the claim is assumed to be allocated. - // - PreBind for pod B is called first, tries to update reservedFor and - // fails because the claim is not really allocated yet. - // - // We could avoid the ordering problem by allowing either pod A or pod B - // to set the allocation. But that is more complicated and leads to another - // problem: - // - Pod A and B get scheduled as above. - // - PreBind for pod A gets called first, then fails with a temporary API error. - // It removes the updated claim from the assume cache because of that. - // - PreBind for pod B gets called next and succeeds with adding the - // allocation and its own reservedFor entry. - // - The assume cache is now not reflecting that the claim is allocated, - // which could lead to reusing the same resource for some other claim. - // - // A sync.Map is used because in practice sharing of a claim between - // pods is expected to be rare compared to per-pod claim, so we end up - // hitting the "multiple goroutines read, write, and overwrite entries - // for disjoint sets of keys" case that sync.Map is optimized for. - inFlightAllocations sync.Map + fh framework.Handle + clientset kubernetes.Interface + celCache *cel.Cache + draManager framework.SharedDRAManager } // New initializes a new plugin and returns it. @@ -178,30 +118,20 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &DynamicResources{}, nil } - logger := klog.FromContext(ctx) pl := &DynamicResources{ enabled: true, enableAdminAccess: fts.EnableDRAAdminAccess, enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, - fh: fh, - clientset: fh.ClientSet(), - classLister: fh.SharedInformerFactory().Resource().V1alpha3().DeviceClasses().Lister(), - sliceLister: fh.SharedInformerFactory().Resource().V1alpha3().ResourceSlices().Lister(), - claimAssumeCache: fh.ResourceClaimCache(), - + fh: fh, + clientset: fh.ClientSet(), // This is a LRU cache for compiled CEL expressions. The most // recent 10 of them get reused across different scheduling // cycles. - celCache: cel.NewCache(10), - - allocatedDevices: newAllocatedDevices(logger), + celCache: cel.NewCache(10), + draManager: fh.SharedDRAManager(), } - // Reacting to events is more efficient than iterating over the list - // repeatedly in PreFilter. - pl.claimAssumeCache.AddEventHandler(pl.allocatedDevices.handlers()) - return pl, nil } @@ -419,16 +349,11 @@ func (pl *DynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podReso if claimName == nil { continue } - obj, err := pl.claimAssumeCache.Get(pod.Namespace + "/" + *claimName) + claim, err := pl.draManager.ResourceClaims().Get(pod.Namespace, *claimName) if err != nil { return err } - claim, ok := obj.(*resourceapi.ResourceClaim) - if !ok { - return fmt.Errorf("unexpected object type %T for assumed object %s/%s", obj, pod.Namespace, *claimName) - } - if claim.DeletionTimestamp != nil { return fmt.Errorf("resourceclaim %q is being deleted", claim.Name) } @@ -499,7 +424,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Allocation in flight? Better wait for that // to finish, see inFlightAllocations // documentation for details. - if _, found := pl.inFlightAllocations.Load(claim.UID); found { + if pl.draManager.ResourceClaims().ClaimHasPendingAllocation(claim.UID) { return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim))) } @@ -516,7 +441,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, statusError(logger, fmt.Errorf("request %s: unsupported request type", request.Name)) } - _, err := pl.classLister.Get(request.DeviceClassName) + _, err := pl.draManager.DeviceClasses().Get(request.DeviceClassName) if err != nil { // If the class cannot be retrieved, allocation cannot proceed. if apierrors.IsNotFound(err) { @@ -546,12 +471,15 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Claims (and thus their devices) are treated as "allocated" if they are in the assume cache // or currently their allocation is in-flight. This does not change // during filtering, so we can determine that once. - allAllocatedDevices := pl.listAllAllocatedDevices(logger) - slices, err := pl.sliceLister.List(labels.Everything()) + allAllocatedDevices, err := pl.draManager.ResourceClaims().ListAllAllocatedDevices() if err != nil { return nil, statusError(logger, err) } - allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.classLister, slices, pl.celCache) + slices, err := pl.draManager.ResourceSlices().List() + if err != nil { + return nil, statusError(logger, err) + } + allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache) if err != nil { return nil, statusError(logger, err) } @@ -563,23 +491,6 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, nil } -func (pl *DynamicResources) listAllAllocatedDevices(logger klog.Logger) sets.Set[structured.DeviceID] { - // Start with a fresh set that matches the current known state of the - // world according to the informers. - allocated := pl.allocatedDevices.Get() - - // Whatever is in flight also has to be checked. - pl.inFlightAllocations.Range(func(key, value any) bool { - claim := value.(*resourceapi.ResourceClaim) - foreachAllocatedDevice(claim, func(deviceID structured.DeviceID) { - logger.V(6).Info("Device is in flight for allocation", "device", deviceID, "claim", klog.KObj(claim)) - allocated.Insert(deviceID) - }) - return true - }) - return allocated -} - // PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *DynamicResources) PreFilterExtensions() framework.PreFilterExtensions { return nil @@ -792,7 +703,10 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat claim.Finalizers = append(claim.Finalizers, resourceapi.Finalizer) } claim.Status.Allocation = allocation - pl.inFlightAllocations.Store(claim.UID, claim) + err := pl.draManager.ResourceClaims().SignalClaimPendingAllocation(claim.UID, claim) + if err != nil { + return statusError(logger, fmt.Errorf("internal error, couldn't signal allocation for claim %s", claim.Name)) + } logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "allocation", klog.Format(allocation)) } } @@ -819,8 +733,8 @@ func (pl *DynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt for index, claim := range state.claims { // If allocation was in-flight, then it's not anymore and we need to revert the // claim object in the assume cache to what it was before. - if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found { - pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + if deleted := pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(state.claims[index].UID); deleted { + pl.draManager.ResourceClaims().AssumedClaimRestore(claim.Namespace, claim.Name) } if claim.Status.Allocation != nil && @@ -892,11 +806,11 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind if finalErr == nil { // This can fail, but only for reasons that are okay (concurrent delete or update). // Shouldn't happen in this case. - if err := pl.claimAssumeCache.Assume(claim); err != nil { + if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil { logger.V(5).Info("Claim not stored in assume cache", "err", finalErr) } } - pl.inFlightAllocations.Delete(claim.UID) + pl.draManager.ResourceClaims().RemoveClaimPendingAllocation(claim.UID) } }() diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 108d33436df..b4e5c9fe77f 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -916,13 +916,13 @@ func TestPlugin(t *testing.T) { } type testContext struct { - ctx context.Context - client *fake.Clientset - informerFactory informers.SharedInformerFactory - claimAssumeCache *assumecache.AssumeCache - p *DynamicResources - nodeInfos []*framework.NodeInfo - state *framework.CycleState + ctx context.Context + client *fake.Clientset + informerFactory informers.SharedInformerFactory + draManager *DefaultDRAManager + p *DynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState } func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { @@ -984,14 +984,11 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) { } func (tc *testContext) listAssumedClaims() []metav1.Object { - if tc.p.claimAssumeCache == nil { - return nil - } var assumedClaims []metav1.Object - for _, obj := range tc.p.claimAssumeCache.List(nil) { + for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) { claim := obj.(*resourceapi.ResourceClaim) - obj, _ := tc.p.claimAssumeCache.Get(claim.Namespace + "/" + claim.Name) - apiObj, _ := tc.p.claimAssumeCache.GetAPIObj(claim.Namespace + "/" + claim.Name) + obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name) + apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name) if obj != apiObj { assumedClaims = append(assumedClaims, claim) } @@ -1002,7 +999,7 @@ func (tc *testContext) listAssumedClaims() []metav1.Object { func (tc *testContext) listInFlightClaims() []metav1.Object { var inFlightClaims []metav1.Object - tc.p.inFlightAllocations.Range(func(key, value any) bool { + tc.draManager.resourceClaimTracker.inFlightAllocations.Range(func(key, value any) bool { inFlightClaims = append(inFlightClaims, value.(*resourceapi.ResourceClaim)) return true }) @@ -1072,11 +1069,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim, tc.client.PrependReactor("*", "*", reactor) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - tc.claimAssumeCache = assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha3().ResourceClaims().Informer(), "resource claim", "", nil) + tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha3().ResourceClaims().Informer(), "resource claim", "", nil), tc.informerFactory) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), - runtime.WithResourceClaimCache(tc.claimAssumeCache), + runtime.WithSharedDRAManager(tc.draManager), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { @@ -1290,7 +1287,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { } claim = storedClaim } else { - cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + cachedClaim, err := testCtx.draManager.resourceClaimTracker.cache.Get(claimKey) if err != nil { t.Fatalf("retrieve old claim: expected no error, got: %v", err) } @@ -1308,7 +1305,7 @@ func Test_isSchedulableAfterClaimChange(t *testing.T) { // Eventually the assume cache will have it, too. require.EventuallyWithT(t, func(t *assert.CollectT) { - cachedClaim, err := testCtx.claimAssumeCache.Get(claimKey) + cachedClaim, err := testCtx.draManager.resourceClaimTracker.cache.Get(claimKey) require.NoError(t, err, "retrieve claim") if cachedClaim.(*resourceapi.ResourceClaim).ResourceVersion != claim.ResourceVersion { t.Errorf("cached claim not updated yet") diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index f4c17487672..86d24b65daf 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -26,13 +26,19 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/util/sets" - resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/klog/v2" "k8s.io/utils/ptr" ) +type deviceClassLister interface { + // List returns a list of all DeviceClasses. + List() ([]*resourceapi.DeviceClass, error) + // Get returns the DeviceClass with the given className. + Get(className string) (*resourceapi.DeviceClass, error) +} + // Allocator calculates how to allocate a set of unallocated claims which use // structured parameters. // @@ -43,7 +49,7 @@ type Allocator struct { adminAccessEnabled bool claimsToAllocate []*resourceapi.ResourceClaim allocatedDevices sets.Set[DeviceID] - classLister resourcelisters.DeviceClassLister + classLister deviceClassLister slices []*resourceapi.ResourceSlice celCache *cel.Cache } @@ -56,7 +62,7 @@ func NewAllocator(ctx context.Context, adminAccessEnabled bool, claimsToAllocate []*resourceapi.ResourceClaim, allocatedDevices sets.Set[DeviceID], - classLister resourcelisters.DeviceClassLister, + classLister deviceClassLister, slices []*resourceapi.ResourceSlice, celCache *cel.Cache, ) (*Allocator, error) { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 8684108cdb2..b990005679b 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -17,7 +17,6 @@ limitations under the License. package structured import ( - "errors" "flag" "fmt" "slices" @@ -33,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" @@ -1402,10 +1400,7 @@ type informerLister[T any] struct { err error } -func (l informerLister[T]) List(selector labels.Selector) (ret []*T, err error) { - if selector.String() != labels.Everything().String() { - return nil, errors.New("labels selector not implemented") - } +func (l informerLister[T]) List() (ret []*T, err error) { return l.objs, l.err } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index a4cb1cfe905..b44a5315124 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -37,6 +37,7 @@ import ( "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/structured" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" @@ -275,10 +276,8 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { // Track cluster state. informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0) claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer() - classLister := informerFactory.Resource().V1alpha3().DeviceClasses().Lister() - sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister() - claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil) + draManager := dynamicresources.NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil), informerFactory) informerFactory.Start(tCtx.Done()) defer func() { tCtx.Cancel("allocResourceClaimsOp.run is shutting down") @@ -297,7 +296,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) { // The set of nodes is assumed to be fixed at this point. nodes, err := nodeLister.List(labels.Everything()) tCtx.ExpectNoError(err, "list nodes") - slices, err := sliceLister.List(labels.Everything()) + slices, err := draManager.ResourceSlices().List() tCtx.ExpectNoError(err, "list slices") // Allocate one claim at a time, picking nodes randomly. Each @@ -310,10 +309,10 @@ claims: continue } - objs := claimCache.List(nil) + claims, err := draManager.ResourceClaims().List() + tCtx.ExpectNoError(err, "list claims") allocatedDevices := sets.New[structured.DeviceID]() - for _, obj := range objs { - claim := obj.(*resourceapi.ResourceClaim) + for _, claim := range claims { if claim.Status.Allocation == nil { continue } @@ -322,7 +321,7 @@ claims: } } - allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices, celCache) + allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) { @@ -336,10 +335,10 @@ claims: claim.Status.Allocation = &result[0] claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) tCtx.ExpectNoError(err, "update claim status with allocation") - tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") + tCtx.ExpectNoError(draManager.ResourceClaims().AssumeClaimAfterAPICall(claim), "assume claim") continue claims } } - tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items)) + tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims)) } }