From 87cd496a2936d78337243c58474e759dfc3cd55c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Tue, 3 Sep 2024 14:08:03 +0200 Subject: [PATCH] scheduler/framework: introduce pluggable SharedDRAManager SharedDRAManager will be used by the DRA plugin to obtain DRA objects, and to track modifications to them in-memory. The current DRA plugin behavior will be the default implementation of SharedDRAManager. Plugging a different implementation will allow Cluster Autoscaler to provide a simulated state of DRA objects to the DRA plugin when making scheduling simulations, as well as obtain the modifications to DRA objects from the plugin. --- pkg/scheduler/framework/interface.go | 8 +-- pkg/scheduler/framework/listers.go | 69 ++++++++++++++++++++ pkg/scheduler/framework/runtime/framework.go | 29 ++++---- pkg/scheduler/scheduler.go | 5 +- 4 files changed, 90 insertions(+), 21 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 26746cf033a..285973e249f 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -39,7 +39,6 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" - "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) // NodeScoreList declares a list of nodes and their scores. @@ -820,10 +819,9 @@ type Handle interface { SharedInformerFactory() informers.SharedInformerFactory - // ResourceClaimCache returns an assume cache of ResourceClaim objects - // which gets populated by the shared informer factory and the dynamic resources - // plugin. - ResourceClaimCache() *assumecache.AssumeCache + // SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin. + // A non-default implementation can be plugged into the framework to simulate the state of DRA objects. + SharedDRAManager() SharedDRAManager // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status diff --git a/pkg/scheduler/framework/listers.go b/pkg/scheduler/framework/listers.go index 4701bc522d7..4c22e697a09 100644 --- a/pkg/scheduler/framework/listers.go +++ b/pkg/scheduler/framework/listers.go @@ -16,6 +16,13 @@ limitations under the License. package framework +import ( + resourceapi "k8s.io/api/resource/v1alpha3" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/dynamic-resource-allocation/structured" +) + // NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name. type NodeInfoLister interface { // List returns the list of NodeInfos. @@ -40,3 +47,65 @@ type SharedLister interface { NodeInfos() NodeInfoLister StorageInfos() StorageInfoLister } + +// ResourceSliceLister can be used to obtain ResourceSlices. +type ResourceSliceLister interface { + // List returns a list of all ResourceSlices. + List() ([]*resourceapi.ResourceSlice, error) +} + +// DeviceClassLister can be used to obtain DeviceClasses. +type DeviceClassLister interface { + // List returns a list of all DeviceClasses. + List() ([]*resourceapi.DeviceClass, error) + // Get returns the DeviceClass with the given className. + Get(className string) (*resourceapi.DeviceClass, error) +} + +// ResourceClaimTracker can be used to obtain ResourceClaims, and track changes to ResourceClaims in-memory. +// +// If the claims are meant to be allocated in the API during the binding phase (when used by scheduler), the tracker helps avoid +// race conditions between scheduling and binding phases (as well as between the binding phase and the informer cache update). +// +// If the binding phase is not run (e.g. when used by Cluster Autoscaler which only runs the scheduling phase, and simulates binding in-memory), +// the tracker allows the framework user to obtain the claim allocations produced by the DRA plugin, and persist them outside of the API (e.g. in-memory). +type ResourceClaimTracker interface { + // List lists ResourceClaims. The result is guaranteed to immediately include any changes made via AssumeClaimAfterAPICall(), + // and SignalClaimPendingAllocation(). + List() ([]*resourceapi.ResourceClaim, error) + // Get works like List(), but for a single claim. + Get(namespace, claimName string) (*resourceapi.ResourceClaim, error) + // ListAllAllocatedDevices lists all allocated Devices from allocated ResourceClaims. The result is guaranteed to immediately include + // any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation(). + ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error) + + // SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the + // binding phase. This change is immediately reflected in the result of List() and the other accessors. + SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error + // ClaimHasPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid + // race conditions in subsequent scheduling phases. + ClaimHasPendingAllocation(claimUID types.UID) bool + // RemoveClaimPendingAllocation removes the pending allocation for the given ResourceClaim from the tracker if any was signaled via + // SignalClaimPendingAllocation(). Returns whether there was a pending allocation to remove. List() and the other accessors immediately + // stop reflecting the pending allocation in the results. + RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool) + + // AssumeClaimAfterAPICall signals to the tracker that an API call modifying the given ResourceClaim was made in the binding phase, and the + // changes should be reflected in informers very soon. This change is immediately reflected in the result of List() and the other accessors. + // This mechanism can be used to avoid race conditions between the informer update and subsequent scheduling phases. + AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error + // AssumedClaimRestore signals to the tracker that something went wrong with the API call modifying the given ResourceClaim, and + // the changes won't be reflected in informers after all. List() and the other accessors immediately stop reflecting the assumed change, + // and go back to the informer version. + AssumedClaimRestore(namespace, claimName string) +} + +// SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin. +// The plugin's default implementation obtains the objects from the API. A different implementation can be +// plugged into the framework in order to simulate the state of DRA objects. For example, Cluster Autoscaler +// can use this to provide the correct DRA object state to the DRA plugin when simulating scheduling changes in-memory. +type SharedDRAManager interface { + ResourceClaims() ResourceClaimTracker + ResourceSlices() ResourceSliceLister + DeviceClasses() DeviceClassLister +} diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 08fee832fa0..cbe3bcee3d6 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" - "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" ) @@ -72,12 +71,12 @@ type frameworkImpl struct { // pluginsMap contains all plugins, by name. pluginsMap map[string]framework.Plugin - clientSet clientset.Interface - kubeConfig *restclient.Config - eventRecorder events.EventRecorder - informerFactory informers.SharedInformerFactory - resourceClaimCache *assumecache.AssumeCache - logger klog.Logger + clientSet clientset.Interface + kubeConfig *restclient.Config + eventRecorder events.EventRecorder + informerFactory informers.SharedInformerFactory + sharedDRAManager framework.SharedDRAManager + logger klog.Logger metricsRecorder *metrics.MetricAsyncRecorder profileName string @@ -128,7 +127,7 @@ type frameworkOptions struct { kubeConfig *restclient.Config eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory - resourceClaimCache *assumecache.AssumeCache + sharedDRAManager framework.SharedDRAManager snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator @@ -180,10 +179,10 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option } } -// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl. -func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option { +// WithSharedDRAManager sets SharedDRAManager for the framework. +func WithSharedDRAManager(sharedDRAManager framework.SharedDRAManager) Option { return func(o *frameworkOptions) { - o.resourceClaimCache = resourceClaimCache + o.sharedDRAManager = sharedDRAManager } } @@ -267,7 +266,6 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler if options.logger != nil { logger = *options.logger } - f := &frameworkImpl{ registry: r, snapshotSharedLister: options.snapshotSharedLister, @@ -277,7 +275,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, informerFactory: options.informerFactory, - resourceClaimCache: options.resourceClaimCache, + sharedDRAManager: options.sharedDRAManager, metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, @@ -1617,8 +1615,9 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory return f.informerFactory } -func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache { - return f.resourceClaimCache +// SharedDRAManager returns the SharedDRAManager of the framework. +func (f *frameworkImpl) SharedDRAManager() framework.SharedDRAManager { + return f.sharedDRAManager } func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d66f35d132b..ab00d259ff5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -296,9 +297,11 @@ func New(ctx context.Context, waitingPods := frameworkruntime.NewWaitingPodsMap() var resourceClaimCache *assumecache.AssumeCache + var draManager framework.SharedDRAManager if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { resourceClaimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer() resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, informerFactory) } profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, @@ -306,7 +309,7 @@ func New(ctx context.Context, frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithResourceClaimCache(resourceClaimCache), + frameworkruntime.WithSharedDRAManager(draManager), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithParallelism(int(options.parallelism)),