diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index b2a0f331f53..44311f5b247 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -288,6 +289,7 @@ func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, + resourceClaimCache *assumecache.AssumeCache, gvkMap map[framework.GVK]framework.ActionType, ) error { var ( @@ -456,11 +458,9 @@ func addAllEventHandlers( } case framework.ResourceClaim: if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( + handlerRegistration = resourceClaimCache.AddEventHandler( buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), - ); err != nil { - return err - } + ) handlers = append(handlers, handlerRegistration) } case framework.ResourceClass: diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index f0254df4095..a99146cf567 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -26,9 +26,12 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +41,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" @@ -46,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/queue" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) func TestNodeAllocatableChanged(t *testing.T) { @@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) { tests := []struct { name string gvkMap map[framework.GVK]framework.ActionType + enableDRA bool expectStaticInformers map[reflect.Type]bool expectDynamicInformers map[schema.GroupVersionResource]bool }{ @@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) { }, expectDynamicInformers: map[schema.GroupVersionResource]bool{}, }, + { + name: "DRA events disabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "DRA events enabled", + gvkMap: map[framework.GVK]framework.ActionType{ + framework.PodSchedulingContext: framework.Add, + framework.ResourceClaim: framework.Add, + framework.ResourceClass: framework.Add, + framework.ResourceClaimParameters: framework.Add, + framework.ResourceClassParameters: framework.Add, + }, + enableDRA: true, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.Namespace{}): true, + reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true, + reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, { name: "add GVKs handlers defined in framework dynamically", gvkMap: map[framework.GVK]framework.ActionType{ @@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA) logger, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) { dynclient := dyfake.NewSimpleDynamicClient(scheme) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } - if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil { + if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil { t.Fatalf("Add event handlers failed, error = %v", err) } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 5fd8bd86fcf..ff117be8095 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -38,6 +38,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) // NodeScoreList declares a list of nodes and their scores. @@ -701,6 +702,11 @@ type Handle interface { SharedInformerFactory() informers.SharedInformerFactory + // ResourceClaimInfos returns an assume cache of ResourceClaim objects + // which gets populated by the shared informer factory and the dynamic resources + // plugin. + ResourceClaimCache() *assumecache.AssumeCache + // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index f54cf0d2bea..715f3f9b1c6 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -343,7 +343,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe return &dynamicResources{}, nil } - logger := klog.FromContext(ctx) pl := &dynamicResources{ enabled: true, fh: fh, @@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), - claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + claimAssumeCache: fh.ResourceClaimCache(), } return pl, nil @@ -597,21 +596,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po // // TODO (https://github.com/kubernetes/kubernetes/issues/123697): // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). - // - // There is a small race here: - // - The dynamicresources plugin allocates claim A and updates the assume cache. - // - A second pod gets marked as unschedulable based on that assume cache. - // - Before the informer cache here catches up, the pod runs, terminates and - // the claim gets deallocated without ever sending the claim status with - // allocation to the scheduler. - // - The comparison below is for a *very* old claim with no allocation and the - // new claim where the allocation is already removed again, so no - // RemovedClaimAllocation event gets emitted. - // - // This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30. - // TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache - // into the event mechanism. This can be tackled together with adding autoscaler - // support, which also needs to do something with the assume cache. logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) return framework.Queue, nil } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 025a3478383..90f08a1aa7c 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) @@ -1319,10 +1320,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters")) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) - + assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil) opts := []runtime.Option{ runtime.WithClientSet(tc.client), runtime.WithInformerFactory(tc.informerFactory), + runtime.WithResourceClaimCache(assumeCache), } fh, err := runtime.NewFramework(tCtx, nil, nil, opts...) if err != nil { diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 44a28e3c991..257f0e20439 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/util/slice" ) @@ -71,11 +72,12 @@ type frameworkImpl struct { // pluginsMap contains all plugins, by name. pluginsMap map[string]framework.Plugin - clientSet clientset.Interface - kubeConfig *restclient.Config - eventRecorder events.EventRecorder - informerFactory informers.SharedInformerFactory - logger klog.Logger + clientSet clientset.Interface + kubeConfig *restclient.Config + eventRecorder events.EventRecorder + informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache + logger klog.Logger metricsRecorder *metrics.MetricAsyncRecorder profileName string @@ -126,6 +128,7 @@ type frameworkOptions struct { kubeConfig *restclient.Config eventRecorder events.EventRecorder informerFactory informers.SharedInformerFactory + resourceClaimCache *assumecache.AssumeCache snapshotSharedLister framework.SharedLister metricsRecorder *metrics.MetricAsyncRecorder podNominator framework.PodNominator @@ -176,6 +179,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option } } +// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl. +func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option { + return func(o *frameworkOptions) { + o.resourceClaimCache = resourceClaimCache + } +} + // WithSnapshotSharedLister sets the SharedLister of the snapshot. func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option { return func(o *frameworkOptions) { @@ -259,6 +269,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler kubeConfig: options.kubeConfig, eventRecorder: options.eventRecorder, informerFactory: options.informerFactory, + resourceClaimCache: options.resourceClaimCache, metricsRecorder: options.metricsRecorder, extenders: options.extenders, PodNominator: options.podNominator, @@ -1598,6 +1609,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory return f.informerFactory } +func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache { + return f.resourceClaimCache +} + func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] { pgSet := sets.Set[string]{} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df007aa82fc..291830c642e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -48,6 +48,7 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) const ( @@ -293,11 +294,18 @@ func New(ctx context.Context, snapshot := internalcache.NewEmptySnapshot() metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } + profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), frameworkruntime.WithClientSet(client), frameworkruntime.WithKubeConfig(options.kubeConfig), frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithResourceClaimCache(resourceClaimCache), frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithParallelism(int(options.parallelism)), @@ -356,7 +364,7 @@ func New(ctx context.Context, sched.NextPod = podQueue.Pop sched.applyDefaultHandlers() - if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { + if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil { return nil, fmt.Errorf("adding event handlers: %w", err) }