scheduler: central ResourceClaim assume cache

This enables connecting the event handler for ResourceClaim to the assume
cache, which addresses a theoretic race condition.

It may also be useful for implementing the autoscaler support, because now
the autoscaler can modify the content of the cache.
This commit is contained in:
Patrick Ohly 2024-04-01 15:46:48 +02:00
parent dea16757ef
commit 9a6f3b9388
7 changed files with 94 additions and 29 deletions

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@ -288,6 +289,7 @@ func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
resourceClaimCache *assumecache.AssumeCache,
gvkMap map[framework.GVK]framework.ActionType,
) error {
var (
@ -456,11 +458,9 @@ func addAllEventHandlers(
}
case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
handlerRegistration = resourceClaimCache.AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
); err != nil {
return err
}
)
handlers = append(handlers, handlerRegistration)
}
case framework.ResourceClass:

View File

@ -26,9 +26,12 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/apimachinery/pkg/runtime"
@ -38,6 +41,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
@ -46,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
func TestNodeAllocatableChanged(t *testing.T) {
@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) {
tests := []struct {
name string
gvkMap map[framework.GVK]framework.ActionType
enableDRA bool
expectStaticInformers map[reflect.Type]bool
expectDynamicInformers map[schema.GroupVersionResource]bool
}{
@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) {
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events disabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events enabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
enableDRA: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "add GVKs handlers defined in framework dynamically",
gvkMap: map[framework.GVK]framework.ActionType{
@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) {
dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil {
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err)
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
// NodeScoreList declares a list of nodes and their scores.
@ -701,6 +702,11 @@ type Handle interface {
SharedInformerFactory() informers.SharedInformerFactory
// ResourceClaimInfos returns an assume cache of ResourceClaim objects
// which gets populated by the shared informer factory and the dynamic resources
// plugin.
ResourceClaimCache() *assumecache.AssumeCache
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

View File

@ -343,7 +343,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &dynamicResources{}, nil
}
logger := klog.FromContext(ctx)
pl := &dynamicResources{
enabled: true,
fh: fh,
@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
claimAssumeCache: fh.ResourceClaimCache(),
}
return pl, nil
@ -597,21 +596,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
//
// There is a small race here:
// - The dynamicresources plugin allocates claim A and updates the assume cache.
// - A second pod gets marked as unschedulable based on that assume cache.
// - Before the informer cache here catches up, the pod runs, terminates and
// the claim gets deallocated without ever sending the claim status with
// allocation to the scheduler.
// - The comparison below is for a *very* old claim with no allocation and the
// new claim where the allocation is already removed again, so no
// RemovedClaimAllocation event gets emitted.
//
// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
// into the event mechanism. This can be tackled together with adding autoscaler
// support, which also needs to do something with the assume cache.
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.Queue, nil
}

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@ -1319,10 +1320,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(assumeCache),
}
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice"
)
@ -71,11 +72,12 @@ type frameworkImpl struct {
// pluginsMap contains all plugins, by name.
pluginsMap map[string]framework.Plugin
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
logger klog.Logger
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
logger klog.Logger
metricsRecorder *metrics.MetricAsyncRecorder
profileName string
@ -126,6 +128,7 @@ type frameworkOptions struct {
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
@ -176,6 +179,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}
// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
return func(o *frameworkOptions) {
o.resourceClaimCache = resourceClaimCache
}
}
// WithSnapshotSharedLister sets the SharedLister of the snapshot.
func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
return func(o *frameworkOptions) {
@ -259,6 +269,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
resourceClaimCache: options.resourceClaimCache,
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
@ -1598,6 +1609,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
return f.informerFactory
}
func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
return f.resourceClaimCache
}
func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
pgSet := sets.Set[string]{}

View File

@ -48,6 +48,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
const (
@ -293,11 +294,18 @@ func New(ctx context.Context,
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
@ -356,7 +364,7 @@ func New(ctx context.Context,
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}