From 096e9489050e7bed0c33d00ca4b6d6031e44e247 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 16 Feb 2024 11:50:48 +0100 Subject: [PATCH] dra scheduler: support structured parameters When a claim uses structured parameters, as indicated by the resource class flag, the scheduler is responsible for allocating it. To do this it needs to gather information about available node resources by watching NodeResourceSlices and then match the in-tree claim parameters against those resources. --- pkg/scheduler/eventhandlers.go | 18 + .../dynamicresources/dynamicresources.go | 662 ++++++++++++++++-- .../dynamicresources/dynamicresources_test.go | 6 +- .../dynamicresources/structuredparameters.go | 134 ++++ pkg/scheduler/framework/types.go | 26 +- pkg/scheduler/scheduler_test.go | 30 +- .../authorizer/rbac/bootstrappolicy/policy.go | 4 +- .../resourceclaim/resourceclaim.go | 14 + 8 files changed, 799 insertions(+), 95 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 5bf149ebf72..b2a0f331f53 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -472,6 +472,24 @@ func addAllEventHandlers( } handlers = append(handlers, handlerRegistration) } + case framework.ResourceClaimParameters: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaimParameters().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceClaimParameters, "ResourceClaimParameters"), + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + } + case framework.ResourceClassParameters: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClassParameters().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceClassParameters, "ResourceClassParameters"), + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + } case framework.StorageClass: if at&framework.Add != 0 { if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 6373ee16640..9cfa96e88fe 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -18,8 +18,10 @@ package dynamicresources import ( "context" + "encoding/json" "errors" "fmt" + "slices" "sort" "sync" @@ -30,6 +32,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -43,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/utils/ptr" ) @@ -70,37 +74,52 @@ type stateData struct { // Empty if the Pod has no claims. claims []*resourcev1alpha2.ResourceClaim + // podSchedulingState keeps track of the PodSchedulingContext + // (if one exists) and the changes made to it. + podSchedulingState podSchedulingState + + // resourceModel contains the information about available and allocated resources when using + // structured parameters and the pod needs this information. + resources resources + + // mutex must be locked while accessing any of the fields below. + mutex sync.Mutex + // The indices of all claims that: // - are allocated - // - use delayed allocation + // - use delayed allocation or the builtin controller // - were not available on at least one node // // Set in parallel during Filter, so write access there must be // protected by the mutex. Used by PostFilter. unavailableClaims sets.Set[int] - // podSchedulingState keeps track of the PodSchedulingContext - // (if one exists) and the changes made to it. - podSchedulingState podSchedulingState - - mutex sync.Mutex - informationsForClaim []informationForClaim } +func (d *stateData) Clone() framework.StateData { + return d +} + type informationForClaim struct { // The availableOnNode node filter of the claim converted from the // v1 API to nodeaffinity.NodeSelector by PreFilter for repeated // evaluation in Filter. Nil for claim which don't have it. availableOnNode *nodeaffinity.NodeSelector + // The status of the claim got from the // schedulingCtx by PreFilter for repeated // evaluation in Filter. Nil for claim which don't have it. status *resourcev1alpha2.ResourceClaimSchedulingStatus -} -func (d *stateData) Clone() framework.StateData { - return d + // structuredParameters is true if the claim is handled via the builtin + // controller. + structuredParameters bool + controller *claimController + + // Set by Reserved, published by PreBind. + allocation *resourcev1alpha2.AllocationResult + allocationDriverName string } type podSchedulingState struct { @@ -256,23 +275,90 @@ type dynamicResources struct { claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister + claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister + classParametersLister resourcev1alpha2listers.ResourceClassParametersLister + nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister + claimNameLookup *resourceclaim.Lookup + + // claimAssumeCache enables temporarily storing a newer claim object + // while the scheduler has allocated it and the corresponding object + // update from the apiserver has not been processed by the claim + // informer callbacks. Claims get added here in Reserve and removed by + // the informer callback (based on the "newer than" comparison in the + // assume cache) or when the API call in PreBind fails. + // + // It uses cache.MetaNamespaceKeyFunc to generate object names, which + // therefore are "/". + // + // This is necessary to ensure that reconstructing the resource usage + // at the start of a pod scheduling cycle doesn't reuse the resources + // assigned to such a claim. Alternatively, claim allocation state + // could also get tracked across pod scheduling cycles, but that + // - adds complexity (need to carefully sync state with informer events + // for claims and NodeResourceSlices) + // - would make integration with cluster autoscaler harder because it would need + // to trigger informer callbacks. + // + // When implementing cluster autoscaler support, this assume cache or + // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) + // might have to be managed by the cluster autoscaler. + claimAssumeCache volumebinding.AssumeCache + + // inFlightAllocations is map from claim UUIDs to true for those claims + // for which allocation was triggered during a scheduling cycle and the + // corresponding claim status update call in PreBind has not been done + // yet. If another pod needs the claim, the pod is treated as "not + // schedulable yet". The cluster event for the claim status update will + // make it schedulable. + // + // This mechanism avoids the following problem: + // - Pod A triggers allocation for claim X. + // - Pod B shares access to that claim and gets scheduled because + // the claim is assumed to be allocated. + // - PreBind for pod B is called first, tries to update reservedFor and + // fails because the claim is not really allocated yet. + // + // We could avoid the ordering problem by allowing either pod A or pod B + // to set the allocation. But that is more complicated and leads to another + // problem: + // - Pod A and B get scheduled as above. + // - PreBind for pod A gets called first, then fails with a temporary API error. + // It removes the updated claim from the assume cache because of that. + // - PreBind for pod B gets called next and succeeds with adding the + // allocation and its own reservedFor entry. + // - The assume cache is now not reflecting that the claim is allocated, + // which could lead to reusing the same resource for some other claim. + // + // A sync.Map is used because in practice sharing of a claim between + // pods is expected to be rare compared to per-pod claim, so we end up + // hitting the "multiple goroutines read, write, and overwrite entries + // for disjoint sets of keys" case that sync.Map is optimized for. + inFlightAllocations sync.Map } // New initializes a new plugin and returns it. -func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { if !fts.EnableDynamicResourceAllocation { // Disabled, won't do anything. return &dynamicResources{}, nil } - return &dynamicResources{ + logger := klog.FromContext(ctx) + pl := &dynamicResources{ enabled: true, fh: fh, clientset: fh.ClientSet(), claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), - }, nil + claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), + classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), + nodeResourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().NodeResourceSlices().Lister(), + claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), + claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + } + + return pl, nil } var _ framework.PreEnqueuePlugin = &dynamicResources{} @@ -296,7 +382,13 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint if !pl.enabled { return nil } + events := []framework.ClusterEventWithHint{ + // Changes for claim or class parameters creation may make pods + // schedulable which depend on claims using those parameters. + {Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange}, + {Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange}, + // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, // When a driver has provided additional information, a pod waiting for that information @@ -321,6 +413,149 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status return nil } +// isSchedulableAfterClaimParametersChange is invoked for add and update claim parameters events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. The delete claim event will not invoke it, so newObj will never be nil. +func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClaimParameters](oldObj, newObj) + if err != nil { + // Shouldn't happen. + return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err) + } + + usesParameters := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + ref := claim.Spec.ParametersRef + if ref == nil { + return + } + + // Using in-tree parameters directly? + if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + ref.Kind == "ResourceClaimParameters" { + if modifiedParameters.Name == ref.Name { + usesParameters = true + } + return + } + + // Need to look for translated parameters. + generatedFrom := modifiedParameters.GeneratedFrom + if generatedFrom == nil { + return + } + if generatedFrom.APIGroup == ref.APIGroup && + generatedFrom.Kind == ref.Kind && + generatedFrom.Name == ref.Name { + usesParameters = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error()) + return framework.QueueSkip, nil + } + + if !usesParameters { + // This were not the parameters the pod was waiting for. + logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + if originalParameters == nil { + logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil + } + + // Modifications may or may not be relevant. If the entire + // requests are as before, then something else must have changed + // and we don't care. + if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) { + logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil +} + +// isSchedulableAfterClassParametersChange is invoked for add and update class parameters events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. The delete class event will not invoke it, so newObj will never be nil. +func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClassParameters](oldObj, newObj) + if err != nil { + // Shouldn't happen. + return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err) + } + + usesParameters := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + class, err := pl.classLister.Get(claim.Spec.ResourceClassName) + if err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "look up resource class") + } + return + } + ref := class.ParametersRef + if ref == nil { + return + } + + // Using in-tree parameters directly? + if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + ref.Kind == "ResourceClassParameters" { + if modifiedParameters.Name == ref.Name { + usesParameters = true + } + return + } + + // Need to look for translated parameters. + generatedFrom := modifiedParameters.GeneratedFrom + if generatedFrom == nil { + return + } + if generatedFrom.APIGroup == ref.APIGroup && + generatedFrom.Kind == ref.Kind && + generatedFrom.Name == ref.Name { + usesParameters = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error()) + return framework.QueueSkip, nil + } + + if !usesParameters { + // This were not the parameters the pod was waiting for. + logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + if originalParameters == nil { + logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters)) + return framework.Queue, nil + } + + // Modifications may or may not be relevant. If the entire + // requests are as before, then something else must have changed + // and we don't care. + if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) { + logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil +} + // isSchedulableAfterClaimChange is invoked for add and update claim events reported by // an informer. It checks whether that change made a previously unschedulable // pod schedulable. It errs on the side of letting a pod scheduling attempt @@ -345,6 +580,33 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po return framework.QueueSkip, nil } + if originalClaim != nil && + resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) && + modifiedClaim.Status.Allocation == nil { + // A claim with structured parameters was deallocated. This might have made + // resources available for other pods. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/123697): + // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). + // + // There is a small race here: + // - The dynamicresources plugin allocates claim A and updates the assume cache. + // - A second pod gets marked as unschedulable based on that assume cache. + // - Before the informer cache here catches up, the pod runs, terminates and + // the claim gets deallocated without ever sending the claim status with + // allocation to the scheduler. + // - The comparison below is for a *very* old claim with no allocation and the + // new claim where the allocation is already removed again, so no + // RemovedClaimAllocation event gets emitted. + // + // This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30. + // TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache + // into the event mechanism. This can be tackled together with adding autoscaler + // support, which also needs to do something with the assume cache. + logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.Queue, nil + } + if !usesClaim { // This was not the claim the pod was waiting for. logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) @@ -526,7 +788,7 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2. // It calls an optional handler for those claims that it finds. func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error { for _, resource := range pod.Spec.ResourceClaims { - claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource) + claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource) if err != nil { return err } @@ -578,24 +840,21 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, statusUnschedulable(logger, err.Error()) } logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claims)) + // If the pod does not reference any claim, // DynamicResources Filter has nothing to do with the Pod. if len(claims) == 0 { return nil, framework.NewStatus(framework.Skip) } - // Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims. + // Fetch PodSchedulingContext, it's going to be needed when checking claims. if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil { return nil, statusError(logger, err) } s.informationsForClaim = make([]informationForClaim, len(claims)) + needResourceInformation := false for index, claim := range claims { - if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate && - claim.Status.Allocation == nil { - // This will get resolved by the resource driver. - return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) - } if claim.Status.DeallocationRequested { // This will get resolved by the resource driver. return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) @@ -606,16 +865,20 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Resource is in use. The pod has to wait. return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) } - if claim.Status.Allocation != nil && - claim.Status.Allocation.AvailableOnNodes != nil { - nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes) - if err != nil { - return nil, statusError(logger, err) + + if claim.Status.Allocation != nil { + if claim.Status.Allocation.AvailableOnNodes != nil { + nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes) + if err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim[index].availableOnNode = nodeSelector } - s.informationsForClaim[index].availableOnNode = nodeSelector - } - if claim.Status.Allocation == nil && - claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer { + + // The claim was allocated by the scheduler if it has the finalizer that is + // reserved for Kubernetes. + s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) + } else { // The ResourceClass might have a node filter. This is // useful for trimming the initial set of potential // nodes before we ask the driver(s) for information @@ -638,16 +901,140 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl } s.informationsForClaim[index].availableOnNode = selector } - // Now we need information from drivers. s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) + + if class.StructuredParameters != nil && *class.StructuredParameters { + s.informationsForClaim[index].structuredParameters = true + + // Allocation in flight? Better wait for that + // to finish, see inFlightAllocations + // documentation for details. + if _, found := pl.inFlightAllocations.Load(claim.UID); found { + return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim))) + } + + // We need the claim and class parameters. If + // they don't exist yet, the pod has to wait. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/123697): + // check this already in foreachPodResourceClaim, together with setting up informationsForClaim. + // Then PreEnqueue will also check for existence of parameters. + classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim) + if status != nil { + return nil, status + } + controller, err := newClaimController(logger, class, classParameters, claimParameters) + if err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim[index].controller = controller + needResourceInformation = true + } else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate { + // This will get resolved by the resource driver. + return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + } } } + if needResourceInformation { + // Doing this over and over again for each pod could be avoided + // by parsing once when creating the plugin and then updating + // that state in informer callbacks. But that would cause + // problems for using the plugin in the Cluster Autoscaler. If + // this step here turns out to be expensive, we may have to + // maintain and update state more persistently. + resources, err := newResourceModel(logger, pl.nodeResourceSliceLister, pl.claimAssumeCache) + if err != nil { + return nil, statusError(logger, err) + } + s.resources = resources + } + s.claims = claims - state.Write(stateKey, s) return nil, nil } +func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters, status *framework.Status) { + classParameters, status = pl.lookupClassParameters(logger, class) + if status != nil { + return + } + claimParameters, status = pl.lookupClaimParameters(logger, claim) + return +} + +func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass) (*resourcev1alpha2.ResourceClassParameters, *framework.Status) { + if class.ParametersRef == nil { + return nil, nil + } + + if class.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + class.ParametersRef.Kind == "ResourceClassParameters" { + // Use the parameters which were referenced directly. + parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name))) + } + return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err)) + } + return parameters, nil + } + + // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer + allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything()) + if err != nil { + return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err)) + } + for _, parameters := range allParameters { + if parameters.GeneratedFrom == nil { + continue + } + if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup && + parameters.GeneratedFrom.Kind == class.ParametersRef.Kind && + parameters.GeneratedFrom.Name == class.ParametersRef.Name && + parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace { + return parameters, nil + } + } + return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name))) +} + +func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) { + if claim.Spec.ParametersRef == nil { + return nil, nil + } + if claim.Spec.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" { + // Use the parameters which were referenced directly. + parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) + } + return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err)) + } + return parameters, nil + } + + // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer + allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything()) + if err != nil { + return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err)) + } + for _, parameters := range allParameters { + if parameters.GeneratedFrom == nil { + continue + } + if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup && + parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind && + parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name { + return parameters, nil + } + } + return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) +} + // PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions { return nil @@ -703,22 +1090,39 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState case claim.Status.DeallocationRequested: // We shouldn't get here. PreFilter already checked this. return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) - case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer: + case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer || + state.informationsForClaim[index].structuredParameters: if selector := state.informationsForClaim[index].availableOnNode; selector != nil { if matches := selector.Match(node); !matches { return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName) } } - if status := state.informationsForClaim[index].status; status != nil { - for _, unsuitableNode := range status.UnsuitableNodes { - if node.Name == unsuitableNode { - return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes) + // Can the builtin controller tell us whether the node is suitable? + if state.informationsForClaim[index].structuredParameters { + suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources) + if err != nil { + // An error indicates that something wasn't configured correctly, for example + // writing a CEL expression which doesn't handle a map lookup error. Normally + // this should never fail. We could return an error here, but then the pod + // would get retried. Instead we ignore the node. + return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + } + if !suitable { + return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + } + } else { + if status := state.informationsForClaim[index].status; status != nil { + for _, unsuitableNode := range status.UnsuitableNodes { + if node.Name == unsuitableNode { + return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes) + } } } } default: - // This should have been delayed allocation. Immediate - // allocation was already checked for in PreFilter. + // This claim should have been handled above. + // Immediate allocation with control plane controller + // was already checked for in PreFilter. return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode)) } } @@ -736,7 +1140,11 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState // delayed allocation. Claims with immediate allocation // would just get allocated again for a random node, // which is unlikely to help the pod. - if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer { + // + // Claims with builtin controller are handled like + // claims with delayed allocation. + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer || + state.informationsForClaim[index].controller != nil { state.unavailableClaims.Insert(index) } } @@ -769,12 +1177,19 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS claim := state.claims[index] if len(claim.Status.ReservedFor) == 0 || len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID { + // Is the claim is handled by the builtin controller? + // Then we can simply clear the allocation. Once the + // claim informer catches up, the controllers will + // be notified about this change. + clearAllocation := state.informationsForClaim[index].controller != nil + // Before we tell a driver to deallocate a claim, we // have to stop telling it to allocate. Otherwise, // depending on timing, it will deallocate the claim, // see a PodSchedulingContext with selected node, and // allocate again for that same node. - if state.podSchedulingState.schedulingCtx != nil && + if !clearAllocation && + state.podSchedulingState.schedulingCtx != nil && state.podSchedulingState.schedulingCtx.Spec.SelectedNode != "" { state.podSchedulingState.selectedNode = ptr.To("") if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { @@ -782,9 +1197,13 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS } } - claim := state.claims[index].DeepCopy() - claim.Status.DeallocationRequested = true + claim := claim.DeepCopy() claim.Status.ReservedFor = nil + if clearAllocation { + claim.Status.Allocation = nil + } else { + claim.Status.DeallocationRequested = true + } logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { return nil, statusError(logger, err) @@ -815,14 +1234,15 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta logger := klog.FromContext(ctx) pending := false - for _, claim := range state.claims { - if claim.Status.Allocation == nil { + for index, claim := range state.claims { + if claim.Status.Allocation == nil && + state.informationsForClaim[index].controller == nil { pending = true break } } if !pending { - logger.V(5).Info("no pending claims", "pod", klog.KObj(pod)) + logger.V(5).Info("no pending claims with control plane controller", "pod", klog.KObj(pod)) return nil } @@ -889,7 +1309,7 @@ func haveNode(nodeNames []string, nodeName string) bool { } // Reserve reserves claims for the pod. -func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { if !pl.enabled { return nil } @@ -903,6 +1323,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat numDelayedAllocationPending := 0 numClaimsWithStatusInfo := 0 + claimsWithBuiltinController := make([]int, 0, len(state.claims)) logger := klog.FromContext(ctx) for index, claim := range state.claims { if claim.Status.Allocation != nil { @@ -914,7 +1335,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat continue } - // Must be delayed allocation. + // Do we have the builtin controller? + if state.informationsForClaim[index].controller != nil { + claimsWithBuiltinController = append(claimsWithBuiltinController, index) + continue + } + + // Must be delayed allocation with control plane controller. numDelayedAllocationPending++ // Did the driver provide information that steered node @@ -924,12 +1351,12 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat } } - if numDelayedAllocationPending == 0 { + if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 { // Nothing left to do. return nil } - if !state.preScored { + if !state.preScored && numDelayedAllocationPending > 0 { // There was only one candidate that passed the Filters and // therefore PreScore was not called. // @@ -944,11 +1371,36 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat } } + // Prepare allocation of claims handled by the schedulder. + for _, index := range claimsWithBuiltinController { + claim := state.claims[index] + driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources) + if err != nil { + // We checked before that the node is suitable. This shouldn't have failed, + // so treat this as an error. + return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err)) + } + state.informationsForClaim[index].allocation = allocation + state.informationsForClaim[index].allocationDriverName = driverName + pl.inFlightAllocations.Store(claim.UID, true) + claim = claim.DeepCopy() + claim.Status.DriverName = driverName + claim.Status.Allocation = allocation + if err := pl.claimAssumeCache.Assume(claim); err != nil { + return statusError(logger, fmt.Errorf("update claim assume cache: %v", err)) + } + logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation) + } + // When there is only one pending resource, we can go ahead with // requesting allocation even when we don't have the information from // the driver yet. Otherwise we wait for information before blindly // making a decision that might have to be reversed later. - if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending { + // + // If all pending claims are handled with the builtin controller, + // there is no need for a PodSchedulingContext change. + if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 || + numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending { // TODO: can we increase the chance that the scheduler picks // the same node as before when allocation is on-going, // assuming that that node still fits the pod? Picking a @@ -970,6 +1422,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return nil } + // If all pending claims are handled with the builtin controller, then + // we can allow the pod to proceed. Allocating and reserving the claims + // will be done in PreBind. + if numDelayedAllocationPending == 0 { + return nil + } + // More than one pending claim and not enough information about all of them. // // TODO: can or should we ensure that schedulingCtx gets aborted while @@ -1016,7 +1475,15 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt } } - for _, claim := range state.claims { + for index, claim := range state.claims { + // If allocation was in-flight, then it's not anymore and we need to revert the + // claim object in the assume cache to what it was before. + if state.informationsForClaim[index].controller != nil { + if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found { + pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + } + } + if claim.Status.Allocation != nil && resourceclaim.IsReservedForPod(pod, claim) { // Remove pod from ReservedFor. A strategic-merge-patch is used @@ -1038,7 +1505,10 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt // PreBind gets called in a separate goroutine after it has been determined // that the pod should get bound to this node. Because Reserve did not actually -// reserve claims, we need to do it now. If that fails, we return an error and +// reserve claims, we need to do it now. For claims with the builtin controller, +// we also handle the allocation. +// +// If anything fails, we return an error and // the pod will have to go into the backoff queue. The scheduler will call // Unreserve as part of the error handling. func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { @@ -1056,6 +1526,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat logger := klog.FromContext(ctx) // Was publishing delayed? If yes, do it now and then cause binding to stop. + // This will not happen if all claims get handled by builtin controllers. if state.podSchedulingState.isDirty() { if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { return statusError(logger, err) @@ -1065,23 +1536,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat for index, claim := range state.claims { if !resourceclaim.IsReservedForPod(pod, claim) { - // The claim might be stale, for example because the claim can get shared and some - // other goroutine has updated it in the meantime. We therefore cannot use - // SSA here to add the pod because then we would have to send the entire slice - // or use different field manager strings for each entry. - // - // With a strategic-merge-patch, we can simply send one new entry. The apiserver - // validation will catch if two goroutines try to do that at the same time and - // the claim cannot be shared. - patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`, - claim.UID, - pod.Name, - pod.UID, - ) - logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) - claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") - logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim)) - // TODO: metric for update errors. + claim, err := pl.bindClaim(ctx, state, index, pod, nodeName) if err != nil { return statusError(logger, err) } @@ -1093,6 +1548,75 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat return nil } +// bindClaim gets called by PreBind for claim which is not reserved for the pod yet. +// It might not even be allocated. bindClaim then ensures that the allocation +// and reservation are recorded. This finishes the work started in Reserve. +func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) { + logger := klog.FromContext(ctx) + claim := state.claims[index] + allocationPatch := "" + + allocation := state.informationsForClaim[index].allocation + logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation) + + // Do we need to store an allocation result from Reserve? + if allocation != nil { + buffer, err := json.Marshal(allocation) + if err != nil { + return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err) + } + allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer)) + + // The finalizer needs to be added in a normal update. Using a simple update is fine + // because we don't expect concurrent modifications while the claim is not allocated + // yet. If there are any, we want to fail. + // + // If we were interrupted in the past, it might already be set and we simply continue. + if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) { + claim := state.claims[index].DeepCopy() + claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) + if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { + return nil, fmt.Errorf("add finalizer: %v", err) + } + } + } + + // The claim might be stale, for example because the claim can get shared and some + // other goroutine has updated it in the meantime. We therefore cannot use + // SSA here to add the pod because then we would have to send the entire slice + // or use different field manager strings for each entry. + // + // With a strategic-merge-patch, we can simply send one new entry. The apiserver + // validation will catch if two goroutines try to do that at the same time and + // the claim cannot be shared. + // + // Note that this also works when the allocation result gets added twice because + // two pods both started using a shared claim: the first pod to get here adds the + // allocation result. The second pod then only adds itself to reservedFor. + patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`, + claim.UID, + allocationPatch, + pod.Name, + pod.UID, + ) + if loggerV := logger.V(6); loggerV.Enabled() { + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch) + } else { + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) + } + claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") + logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err) + if allocationPatch != "" { + // The scheduler was handling allocation. Now that has + // completed, either successfully or with a failure. + if err != nil { + pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + } + pl.inFlightAllocations.Delete(claim.UID) + } + return claim, err +} + // PostBind is called after a pod is successfully bound to a node. Now we are // sure that a PodSchedulingContext object, if it exists, is definitely not going to // be needed anymore and can delete it. This is a one-shot thing, there won't diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 4704f60a65b..070ece83920 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -309,8 +309,9 @@ func TestPlugin(t *testing.T) { }, }, "waiting-for-immediate-allocation": { - pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ prefilter: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`), @@ -812,7 +813,6 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client = fake.NewSimpleClientset() reactor := createReactor(tc.client.Tracker()) tc.client.PrependReactor("*", "*", reactor) - tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) opts := []runtime.Option{ diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go new file mode 100644 index 00000000000..e8a5d0a4522 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + "k8s.io/apimachinery/pkg/labels" + resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" +) + +// resources is a map "node name" -> "driver name" -> available and +// allocated resources per structured parameter model. +type resources map[string]map[string]resourceModels + +// resourceModels may have more than one entry because it is valid for a driver to +// use more than one structured parameter model. +type resourceModels struct { + // TODO: add some structured parameter model +} + +// newResourceModel parses the available information about resources. Objects +// with an unknown structured parameter model silently ignored. An error gets +// logged later when parameters required for a pod depend on such an unknown +// model. +func newResourceModel(logger klog.Logger, nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister, claimAssumeCache volumebinding.AssumeCache) (resources, error) { + model := make(resources) + + slices, err := nodeResourceSliceLister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("list node resource slices: %w", err) + } + for _, slice := range slices { + if model[slice.NodeName] == nil { + model[slice.NodeName] = make(map[string]resourceModels) + } + resource := model[slice.NodeName][slice.DriverName] + // TODO: add some structured parameter model + model[slice.NodeName][slice.DriverName] = resource + } + + objs := claimAssumeCache.List(nil) + for _, obj := range objs { + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj) + } + if claim.Status.Allocation == nil { + continue + } + for _, handle := range claim.Status.Allocation.ResourceHandles { + structured := handle.StructuredData + if structured == nil { + continue + } + if model[structured.NodeName] == nil { + model[structured.NodeName] = make(map[string]resourceModels) + } + // resource := model[structured.NodeName][handle.DriverName] + // TODO: add some structured parameter model + // for _, result := range structured.Results { + // // Call AddAllocation for each known model. Each call itself needs to check for nil. + // } + } + } + + return model, nil +} + +func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClass, classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters) (*claimController, error) { + // Each node driver is separate from the others. Each driver may have + // multiple requests which need to be allocated together, so here + // we have to collect them per model. + // TODO: implement some structured parameters model + + c := &claimController{ + class: class, + classParameters: classParameters, + claimParameters: claimParameters, + } + return c, nil +} + +// claimController currently wraps exactly one structured parameter model. + +type claimController struct { + class *resourcev1alpha2.ResourceClass + classParameters *resourcev1alpha2.ResourceClassParameters + claimParameters *resourcev1alpha2.ResourceClaimParameters + // TODO: implement some structured parameters model +} + +func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) { + // TODO: implement some structured parameters model + return true, nil +} + +func (c claimController) allocate(ctx context.Context, nodeName string, resources resources) (string, *resourcev1alpha2.AllocationResult, error) { + allocation := &resourcev1alpha2.AllocationResult{ + Shareable: c.claimParameters.Shareable, + AvailableOnNodes: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{nodeName}}, + }, + }, + }, + }, + } + + // TODO: implement some structured parameters model + + return c.class.DriverName, allocation, nil +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index cc839b3e3ee..8bd4b86f680 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -72,17 +72,19 @@ const ( // - a Pod that is deleted // - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle. // - an existing Pod that was unscheduled but gets scheduled to a Node. - Pod GVK = "Pod" - Node GVK = "Node" - PersistentVolume GVK = "PersistentVolume" - PersistentVolumeClaim GVK = "PersistentVolumeClaim" - CSINode GVK = "storage.k8s.io/CSINode" - CSIDriver GVK = "storage.k8s.io/CSIDriver" - CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity" - StorageClass GVK = "storage.k8s.io/StorageClass" - PodSchedulingContext GVK = "PodSchedulingContext" - ResourceClaim GVK = "ResourceClaim" - ResourceClass GVK = "ResourceClass" + Pod GVK = "Pod" + Node GVK = "Node" + PersistentVolume GVK = "PersistentVolume" + PersistentVolumeClaim GVK = "PersistentVolumeClaim" + CSINode GVK = "storage.k8s.io/CSINode" + CSIDriver GVK = "storage.k8s.io/CSIDriver" + CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity" + StorageClass GVK = "storage.k8s.io/StorageClass" + PodSchedulingContext GVK = "PodSchedulingContext" + ResourceClaim GVK = "ResourceClaim" + ResourceClass GVK = "ResourceClass" + ResourceClaimParameters GVK = "ResourceClaimParameters" + ResourceClassParameters GVK = "ResourceClassParameters" // WildCard is a special GVK to match all resources. // e.g., If you register `{Resource: "*", ActionType: All}` in EventsToRegister, @@ -176,6 +178,8 @@ func UnrollWildCardResource() []ClusterEventWithHint { {Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}}, {Event: ClusterEvent{Resource: ResourceClaim, ActionType: All}}, {Event: ClusterEvent{Resource: ResourceClass, ActionType: All}}, + {Event: ClusterEvent{Resource: ResourceClaimParameters, ActionType: All}}, + {Event: ClusterEvent{Resource: ResourceClassParameters, ActionType: All}}, } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6297e439884..3ebe9c7600a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -644,6 +644,12 @@ func Test_buildQueueingHintMap(t *testing.T) { {Resource: framework.ResourceClass, ActionType: framework.All}: { {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, }, + {Resource: framework.ResourceClaimParameters, ActionType: framework.All}: { + {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.ResourceClassParameters, ActionType: framework.All}: { + {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, + }, }, }, { @@ -768,17 +774,19 @@ func Test_UnionedGVKs(t *testing.T) { Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins }, want: map[framework.GVK]framework.ActionType{ - framework.Pod: framework.All, - framework.Node: framework.All, - framework.CSINode: framework.All, - framework.CSIDriver: framework.All, - framework.CSIStorageCapacity: framework.All, - framework.PersistentVolume: framework.All, - framework.PersistentVolumeClaim: framework.All, - framework.StorageClass: framework.All, - framework.PodSchedulingContext: framework.All, - framework.ResourceClaim: framework.All, - framework.ResourceClass: framework.All, + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + framework.ResourceClaim: framework.All, + framework.ResourceClass: framework.All, + framework.ResourceClaimParameters: framework.All, + framework.ResourceClassParameters: framework.All, }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 496d3d80b10..58ac1a15712 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -576,11 +576,13 @@ func ClusterRoles() []rbacv1.ClusterRole { // Needed for dynamic resource allocation. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { kubeSchedulerRules = append(kubeSchedulerRules, - rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclaims", "resourceclasses").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclasses").RuleOrDie(), + rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), rbacv1helpers.NewRule(ReadWrite...).Groups(resourceGroup).Resources("podschedulingcontexts").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("podschedulingcontexts/status").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("noderesourceslices", "resourceclassparameters", "resourceclaimparameters").RuleOrDie(), ) } roles = append(roles, rbacv1.ClusterRole{ diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go index e3b8b35ac6b..5c1042329be 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go @@ -183,3 +183,17 @@ func CanBeReserved(claim *resourcev1alpha2.ResourceClaim) bool { return claim.Status.Allocation.Shareable || len(claim.Status.ReservedFor) == 0 } + +// IsAllocatedWithStructuredParameters checks whether the claim is allocated +// and the allocation was done with structured parameters. +func IsAllocatedWithStructuredParameters(claim *resourcev1alpha2.ResourceClaim) bool { + if claim.Status.Allocation == nil { + return false + } + for _, handle := range claim.Status.Allocation.ResourceHandles { + if handle.StructuredData != nil { + return true + } + } + return false +}