DRA: scheduler: index claim and class parameters to simplify lookup

This commit is contained in:
carlory 2024-04-29 18:36:23 +08:00
parent 5ceb99dc6b
commit 3072987fcc
3 changed files with 149 additions and 26 deletions

View File

@ -32,7 +32,6 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -40,6 +39,7 @@ import (
resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2"
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/client-go/tools/cache"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
@ -56,6 +56,10 @@ const (
Name = names.DynamicResources
stateKey framework.StateKey = Name
// generatedFromIndex is the lookup name for the index function
// which indexes by other resource which generated the parameters object.
generatedFromIndex = "generated-from-index"
)
// The state is initialized in PreFilter phase. Because we save the pointer in
@ -280,6 +284,13 @@ type dynamicResources struct {
resourceSliceLister resourcev1alpha2listers.ResourceSliceLister
claimNameLookup *resourceclaim.Lookup
// claimParametersIndexer has the common claimParametersGeneratedFrom indexer installed to
// limit iteration over claimParameters to those of interest.
claimParametersIndexer cache.Indexer
// classParametersIndexer has the common classParametersGeneratedFrom indexer installed to
// limit iteration over classParameters to those of interest.
classParametersIndexer cache.Indexer
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
// update from the apiserver has not been processed by the claim
@ -352,15 +363,58 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
claimParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Informer().GetIndexer(),
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
classParametersIndexer: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Informer().GetIndexer(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
}
if err := pl.claimParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: claimParametersGeneratedFromIndexFunc}); err != nil {
return nil, fmt.Errorf("add claim parameters cache indexer: %w", err)
}
if err := pl.classParametersIndexer.AddIndexers(cache.Indexers{generatedFromIndex: classParametersGeneratedFromIndexFunc}); err != nil {
return nil, fmt.Errorf("add class parameters cache indexer: %w", err)
}
return pl, nil
}
func claimParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClaimParametersReference) string {
return ref.APIGroup + "/" + ref.Kind + "/" + ref.Name
}
// claimParametersGeneratedFromIndexFunc is an index function that returns other resource keys
// (= apiGroup/kind/name) for ResourceClaimParametersReference in a given claim parameters.
func claimParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) {
parameters, ok := obj.(*resourcev1alpha2.ResourceClaimParameters)
if !ok {
return nil, nil
}
if parameters.GeneratedFrom == nil {
return nil, nil
}
return []string{claimParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil
}
func classParametersReferenceKeyFunc(ref *resourcev1alpha2.ResourceClassParametersReference) string {
return ref.APIGroup + "/" + ref.Kind + "/" + ref.Namespace + "/" + ref.Name
}
// classParametersGeneratedFromIndexFunc is an index function that returns other resource keys
// (= apiGroup/kind/namespace/name) for ResourceClassParametersReference in a given class parameters.
func classParametersGeneratedFromIndexFunc(obj interface{}) ([]string, error) {
parameters, ok := obj.(*resourcev1alpha2.ResourceClassParameters)
if !ok {
return nil, nil
}
if parameters.GeneratedFrom == nil {
return nil, nil
}
return []string{classParametersReferenceKeyFunc(parameters.GeneratedFrom)}, nil
}
var _ framework.PreEnqueuePlugin = &dynamicResources{}
var _ framework.PreFilterPlugin = &dynamicResources{}
var _ framework.FilterPlugin = &dynamicResources{}
@ -987,23 +1041,22 @@ func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *res
return parameters, nil
}
// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything())
objs, err := pl.classParametersIndexer.ByIndex(generatedFromIndex, classParametersReferenceKeyFunc(class.ParametersRef))
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err))
}
for _, parameters := range allParameters {
if parameters.GeneratedFrom == nil {
continue
}
if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup &&
parameters.GeneratedFrom.Kind == class.ParametersRef.Kind &&
parameters.GeneratedFrom.Name == class.ParametersRef.Name &&
parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace {
return parameters, nil
switch len(objs) {
case 0:
return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
case 1:
parameters, ok := objs[0].(*resourcev1alpha2.ResourceClassParameters)
if !ok {
return nil, statusError(logger, fmt.Errorf("unexpected object in class parameters index: %T", objs[0]))
}
return parameters, nil
default:
return nil, statusError(logger, fmt.Errorf("multiple generated class parameters for %s.%s %s found: %s", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name), klog.KObjSlice(objs)))
}
return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name)))
}
func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) {
@ -1045,22 +1098,22 @@ func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *res
return parameters, nil
}
// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything())
objs, err := pl.claimParametersIndexer.ByIndex(generatedFromIndex, claimParametersReferenceKeyFunc(claim.Spec.ParametersRef))
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err))
}
for _, parameters := range allParameters {
if parameters.GeneratedFrom == nil {
continue
}
if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup &&
parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind &&
parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name {
return parameters, nil
switch len(objs) {
case 0:
return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
case 1:
parameters, ok := objs[0].(*resourcev1alpha2.ResourceClaimParameters)
if !ok {
return nil, statusError(logger, fmt.Errorf("unexpected object in claim parameters index: %T", objs[0]))
}
return parameters, nil
default:
return nil, statusError(logger, fmt.Errorf("multiple generated claim parameters for %s.%s %s found: %s", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), klog.KObjSlice(objs)))
}
return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -659,7 +659,7 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{claimParameters /* classParameters, */, workerNodeSlice},
objs: []apiruntime.Object{claimParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `class parameters default/my-resource-class not found`),
@ -674,7 +674,7 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},
classes: []*resourcev1alpha2.ResourceClass{structuredResourceClassWithParams},
objs: []apiruntime.Object{ /* claimParameters, */ classParameters, workerNodeSlice},
objs: []apiruntime.Object{classParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim parameters default/my-pod-my-resource not found`),
@ -685,6 +685,66 @@ func TestPlugin(t *testing.T) {
},
},
"missing-translated-class-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{claimParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated class parameters for ResourceClassParameters.example.com default/my-resource-class not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"missing-translated-claim-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{classParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource not found`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"too-many-translated-class-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{claimParameters, classParameters, st.FromClassParameters(classParameters).Name("other").Obj() /* too many */, workerNodeSlice},
want: want{
prefilter: result{
status: framework.AsStatus(errors.New(`multiple generated class parameters for ResourceClassParameters.example.com my-resource-class found: [default/my-resource-class default/other]`)),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"too-many-translated-claim-parameters": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{claimWithCRD(pendingDelayedClaimWithParams)},
classes: []*resourcev1alpha2.ResourceClass{classWithCRD(structuredResourceClassWithCRD)},
objs: []apiruntime.Object{claimParameters, st.FromClaimParameters(claimParameters).Name("other").Obj() /* too many */, classParameters, workerNodeSlice},
want: want{
prefilter: result{
status: framework.AsStatus(errors.New(`multiple generated claim parameters for ResourceClaimParameters.example.com default/my-pod-my-resource found: [default/my-pod-my-resource default/other]`)),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"claim-parameters-CEL-runtime-error": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaimWithParams},

View File

@ -1154,6 +1154,11 @@ func MakeClaimParameters() *ClaimParametersWrapper {
return &ClaimParametersWrapper{}
}
// FromClaimParameters creates a ResourceClaimParameters wrapper from an existing object.
func FromClaimParameters(other *resourcev1alpha2.ResourceClaimParameters) *ClaimParametersWrapper {
return &ClaimParametersWrapper{*other.DeepCopy()}
}
func (wrapper *ClaimParametersWrapper) Obj() *resourcev1alpha2.ResourceClaimParameters {
return &wrapper.ResourceClaimParameters
}
@ -1209,6 +1214,11 @@ func MakeClassParameters() *ClassParametersWrapper {
return &ClassParametersWrapper{}
}
// FromClassParameters creates a ResourceClassParameters wrapper from an existing object.
func FromClassParameters(other *resourcev1alpha2.ResourceClassParameters) *ClassParametersWrapper {
return &ClassParametersWrapper{*other.DeepCopy()}
}
func (wrapper *ClassParametersWrapper) Obj() *resourcev1alpha2.ResourceClassParameters {
return &wrapper.ResourceClassParameters
}