diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index a3d17cb8f12..95a40b8d957 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -32,7 +32,6 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -40,6 +39,7 @@ import ( resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2" "k8s.io/client-go/kubernetes" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" + "k8s.io/client-go/tools/cache" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" @@ -56,6 +56,10 @@ const ( Name = names.DynamicResources stateKey framework.StateKey = Name + + // generatedFromIndex is the lookup name for the index function + // which indexes by other resource which generated the parameters object. + generatedFromIndex = "generated-from-index" ) // The state is initialized in PreFilter phase. Because we save the pointer in @@ -280,6 +284,13 @@ type dynamicResources struct { resourceSliceLister resourcev1alpha2listers.ResourceSliceLister claimNameLookup *resourceclaim.Lookup + // claimParametersIndexer has the common claimParametersGeneratedFrom indexer installed to + // limit iteration over claimParameters to those of interest. + claimParametersIndexer cache.Indexer + // classParametersIndexer has the common classParametersGeneratedFrom indexer installed to + // limit iteration over classParameters to those of interest. + classParametersIndexer cache.Indexer + // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object // update from the apiserver has not been processed by the claim @@ -352,15 +363,58 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), + claimParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Informer().GetIndexer(), classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), + classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(), resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(), claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), } + if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil { + return nil, fmt.Errorf("add claim parameters cache indexer: %w", err) + } + if err := pl.classParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: classParametersGeneratedFromIndexFunc}); err != nil { + return nil, fmt.Errorf("add class parameters cache indexer: %w", err) + } + return pl, nil } +func claimParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClaimParametersReference) string { + return ref.APIGroup + "/" + ref.Kind + "/" + ref.Name +} + +// claimParametersGeneratedFromIndexFunc is an index function that returns other resource keys +// (= apiGroup/kind/name) for ResourceClaimParametersReference in a given claim parameters. +func claimParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) { + parameters, ok := obj.(*resourcev1alpha2.ResourceClaimParameters) + if !ok { + return nil, nil + } + if parameters.GeneratedFrom == nil { + return nil, nil + } + return []string{claimParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil +} + +func classParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClassParametersReference) string { + return ref.APIGroup + "/" + ref.Kind + "/" + ref.Namespace + "/" + ref.Name +} + +// classParametersGeneratedFromIndexFunc is an index function that returns other resource keys +// (= apiGroup/kind/namespace/name) for ResourceClassParametersReference in a given class parameters. +func classParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) { + parameters, ok := obj.(*resourcev1alpha2.ResourceClassParameters) + if !ok { + return nil, nil + } + if parameters.GeneratedFrom == nil { + return nil, nil + } + return []string{classParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil +} + var _ framework.PreEnqueuePlugin = &dynamicResources{} var _ framework.PreFilterPlugin = &dynamicResources{} var _ framework.FilterPlugin = &dynamicResources{} @@ -987,23 +1041,22 @@ func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *res return parameters, nil } - // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer - allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything()) + objs, err := pl.classParametersIndexer.ByIndex(generatedFromIndex, classParametersReferenceKeyFunc(class.ParametersRef)) if err != nil { return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err)) } - for _, parameters := range allParameters { - if parameters.GeneratedFrom == nil { - continue - } - if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup && - parameters.GeneratedFrom.Kind == class.ParametersRef.Kind && - parameters.GeneratedFrom.Name == class.ParametersRef.Name && - parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace { - return parameters, nil + switch len(objs) { + case 0: + return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name))) + case 1: + parameters, ok := objs[0].(*resourcev1alpha2.ResourceClassParameters) + if !ok { + return nil, statusError(logger, fmt.Errorf("unexpected object in class parameters index: %T", objs[0])) } + return parameters, nil + default: + return nil, statusError(logger, fmt.Errorf("multiple generated class parameters for %s.%s %s found: %s", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name), klog.KObjSlice(objs))) } - return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name))) } func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) { @@ -1045,22 +1098,22 @@ func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *res return parameters, nil } - // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer - allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything()) + objs, err := pl.claimParametersIndexer.ByIndex(generatedFromIndex, claimParametersReferenceKeyFunc(claim.Spec.ParametersRef)) if err != nil { return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err)) } - for _, parameters := range allParameters { - if parameters.GeneratedFrom == nil { - continue - } - if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup && - parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind && - parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name { - return parameters, nil + switch len(objs) { + case 0: + return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) + case 1: + parameters, ok := objs[0].(*resourcev1alpha2.ResourceClaimParameters) + if !ok { + return nil, statusError(logger, fmt.Errorf("unexpected object in claim parameters index: %T", objs[0])) } + return parameters, nil + default: + return nil, statusError(logger, fmt.Errorf("multiple generated claim parameters for %s.%s %s found: %s", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), klog.KObjSlice(objs))) } - return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) } // PreFilterExtensions returns prefilter extensions, pod add and remove. diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 025a3478383..566610e891d 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -659,7 +659,7 @@ func TestPlugin(t *testing.T) { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams}, - objs: []apiruntime.Object{claimParameters /* classParameters, */, workerNodeSlice}, + objs: []apiruntime.Object{claimParameters, workerNodeSlice}, want: want{ prefilter: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `class parameters default/my-resource-class not found`), @@ -674,7 +674,7 @@ func TestPlugin(t *testing.T) { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams}, - objs: []apiruntime.Object{ /* claimParameters, */ classParameters, workerNodeSlice}, + objs: []apiruntime.Object{classParameters, workerNodeSlice}, want: want{ prefilter: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim parameters default/my-pod-my-resource not found`), @@ -685,6 +685,66 @@ func TestPlugin(t *testing.T) { }, }, + "missing-translated-class-parameters": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, + classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, + objs: []apiruntime.Object{claimParameters, workerNodeSlice}, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated class parameters for ResourceClassParameters.example.com default/my-resource-class not found`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + + "missing-translated-claim-parameters": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, + classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, + objs: []apiruntime.Object{classParameters, workerNodeSlice}, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource not found`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + + "too-many-translated-class-parameters": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, + classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, + objs: []apiruntime.Object{claimParameters, classParameters, st.FromClassParameters(classParameters).Name("other").Obj() /* too many */, workerNodeSlice}, + want: want{ + prefilter: result{ + status: framework.AsStatus(errors.New(`multiple generated class parameters for ResourceClassParameters.example.com my-resource-class found: [default/my-resource-class default/other]`)), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + + "too-many-translated-claim-parameters": { + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)}, + classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)}, + objs: []apiruntime.Object{claimParameters, st.FromClaimParameters(claimParameters).Name("other").Obj() /* too many */, classParameters, workerNodeSlice}, + want: want{ + prefilter: result{ + status: framework.AsStatus(errors.New(`multiple generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource found: [default/my-pod-my-resource default/other]`)), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + "claim-parameters-CEL-runtime-error": { pod: podWithClaimName, claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams}, diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 8e50fb212e0..3ddd79cb61d 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -1154,6 +1154,11 @@ func MakeClaimParameters() *ClaimParametersWrapper { return &ClaimParametersWrapper{} } +// FromClaimParameters creates a ResourceClaimParameters wrapper from an existing object. +func FromClaimParameters(other *resourcev1alpha2.ResourceClaimParameters) *ClaimParametersWrapper { + return &ClaimParametersWrapper{*other.DeepCopy()} +} + func (wrapper *ClaimParametersWrapper) Obj() *resourcev1alpha2.ResourceClaimParameters { return &wrapper.ResourceClaimParameters } @@ -1209,6 +1214,11 @@ func MakeClassParameters() *ClassParametersWrapper { return &ClassParametersWrapper{} } +// FromClassParameters creates a ResourceClassParameters wrapper from an existing object. +func FromClassParameters(other *resourcev1alpha2.ResourceClassParameters) *ClassParametersWrapper { + return &ClassParametersWrapper{*other.DeepCopy()} +} + func (wrapper *ClassParametersWrapper) Obj() *resourcev1alpha2.ResourceClassParameters { return &wrapper.ResourceClassParameters }