scheduler/framework: introduce pluggable SharedDRAManager

SharedDRAManager will be used by the DRA plugin to obtain DRA
objects, and to track modifications to them in-memory. The current
DRA plugin behavior will be the default implementation of
SharedDRAManager.

Plugging a different implementation will allow Cluster Autoscaler
to provide a simulated state of DRA objects to the DRA plugin when
making scheduling simulations, as well as obtain the modifications
to DRA objects from the plugin.
This commit is contained in:
Kuba Tużnik 2024-09-03 14:08:03 +02:00
parent 2bb886ce2a
commit 87cd496a29
4 changed files with 90 additions and 21 deletions

View File

@ -39,7 +39,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
// NodeScoreList declares a list of nodes and their scores.
@ -820,10 +819,9 @@ type Handle interface {
SharedInformerFactory() informers.SharedInformerFactory
// ResourceClaimCache returns an assume cache of ResourceClaim objects
// which gets populated by the shared informer factory and the dynamic resources
// plugin.
ResourceClaimCache() *assumecache.AssumeCache
// SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin.
// A non-default implementation can be plugged into the framework to simulate the state of DRA objects.
SharedDRAManager() SharedDRAManager
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

View File

@ -16,6 +16,13 @@ limitations under the License.
package framework
import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/structured"
)
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
type NodeInfoLister interface {
// List returns the list of NodeInfos.
@ -40,3 +47,65 @@ type SharedLister interface {
NodeInfos() NodeInfoLister
StorageInfos() StorageInfoLister
}
// ResourceSliceLister can be used to obtain ResourceSlices.
type ResourceSliceLister interface {
// List returns a list of all ResourceSlices.
List() ([]*resourceapi.ResourceSlice, error)
}
// DeviceClassLister can be used to obtain DeviceClasses.
type DeviceClassLister interface {
// List returns a list of all DeviceClasses.
List() ([]*resourceapi.DeviceClass, error)
// Get returns the DeviceClass with the given className.
Get(className string) (*resourceapi.DeviceClass, error)
}
// ResourceClaimTracker can be used to obtain ResourceClaims, and track changes to ResourceClaims in-memory.
//
// If the claims are meant to be allocated in the API during the binding phase (when used by scheduler), the tracker helps avoid
// race conditions between scheduling and binding phases (as well as between the binding phase and the informer cache update).
//
// If the binding phase is not run (e.g. when used by Cluster Autoscaler which only runs the scheduling phase, and simulates binding in-memory),
// the tracker allows the framework user to obtain the claim allocations produced by the DRA plugin, and persist them outside of the API (e.g. in-memory).
type ResourceClaimTracker interface {
// List lists ResourceClaims. The result is guaranteed to immediately include any changes made via AssumeClaimAfterAPICall(),
// and SignalClaimPendingAllocation().
List() ([]*resourceapi.ResourceClaim, error)
// Get works like List(), but for a single claim.
Get(namespace, claimName string) (*resourceapi.ResourceClaim, error)
// ListAllAllocatedDevices lists all allocated Devices from allocated ResourceClaims. The result is guaranteed to immediately include
// any changes made via AssumeClaimAfterAPICall(), and SignalClaimPendingAllocation().
ListAllAllocatedDevices() (sets.Set[structured.DeviceID], error)
// SignalClaimPendingAllocation signals to the tracker that the given ResourceClaim will be allocated via an API call in the
// binding phase. This change is immediately reflected in the result of List() and the other accessors.
SignalClaimPendingAllocation(claimUID types.UID, allocatedClaim *resourceapi.ResourceClaim) error
// ClaimHasPendingAllocation answers whether a given claim has a pending allocation during the binding phase. It can be used to avoid
// race conditions in subsequent scheduling phases.
ClaimHasPendingAllocation(claimUID types.UID) bool
// RemoveClaimPendingAllocation removes the pending allocation for the given ResourceClaim from the tracker if any was signaled via
// SignalClaimPendingAllocation(). Returns whether there was a pending allocation to remove. List() and the other accessors immediately
// stop reflecting the pending allocation in the results.
RemoveClaimPendingAllocation(claimUID types.UID) (deleted bool)
// AssumeClaimAfterAPICall signals to the tracker that an API call modifying the given ResourceClaim was made in the binding phase, and the
// changes should be reflected in informers very soon. This change is immediately reflected in the result of List() and the other accessors.
// This mechanism can be used to avoid race conditions between the informer update and subsequent scheduling phases.
AssumeClaimAfterAPICall(claim *resourceapi.ResourceClaim) error
// AssumedClaimRestore signals to the tracker that something went wrong with the API call modifying the given ResourceClaim, and
// the changes won't be reflected in informers after all. List() and the other accessors immediately stop reflecting the assumed change,
// and go back to the informer version.
AssumedClaimRestore(namespace, claimName string)
}
// SharedDRAManager can be used to obtain DRA objects, and track modifications to them in-memory - mainly by the DRA plugin.
// The plugin's default implementation obtains the objects from the API. A different implementation can be
// plugged into the framework in order to simulate the state of DRA objects. For example, Cluster Autoscaler
// can use this to provide the correct DRA object state to the DRA plugin when simulating scheduling changes in-memory.
type SharedDRAManager interface {
ResourceClaims() ResourceClaimTracker
ResourceSlices() ResourceSliceLister
DeviceClasses() DeviceClassLister
}

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice"
)
@ -72,12 +71,12 @@ type frameworkImpl struct {
// pluginsMap contains all plugins, by name.
pluginsMap map[string]framework.Plugin
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
logger klog.Logger
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
sharedDRAManager framework.SharedDRAManager
logger klog.Logger
metricsRecorder *metrics.MetricAsyncRecorder
profileName string
@ -128,7 +127,7 @@ type frameworkOptions struct {
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
sharedDRAManager framework.SharedDRAManager
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
@ -180,10 +179,10 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}
// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
// WithSharedDRAManager sets SharedDRAManager for the framework.
func WithSharedDRAManager(sharedDRAManager framework.SharedDRAManager) Option {
return func(o *frameworkOptions) {
o.resourceClaimCache = resourceClaimCache
o.sharedDRAManager = sharedDRAManager
}
}
@ -267,7 +266,6 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
if options.logger != nil {
logger = *options.logger
}
f := &frameworkImpl{
registry: r,
snapshotSharedLister: options.snapshotSharedLister,
@ -277,7 +275,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
resourceClaimCache: options.resourceClaimCache,
sharedDRAManager: options.sharedDRAManager,
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
@ -1617,8 +1615,9 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
return f.informerFactory
}
func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
return f.resourceClaimCache
// SharedDRAManager returns the SharedDRAManager of the framework.
func (f *frameworkImpl) SharedDRAManager() framework.SharedDRAManager {
return f.sharedDRAManager
}
func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics"
@ -296,9 +297,11 @@ func New(ctx context.Context,
waitingPods := frameworkruntime.NewWaitingPodsMap()
var resourceClaimCache *assumecache.AssumeCache
var draManager framework.SharedDRAManager
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, informerFactory)
}
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
@ -306,7 +309,7 @@ func New(ctx context.Context,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSharedDRAManager(draManager),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),