diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 5bf149ebf72..b2a0f331f53 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -472,6 +472,24 @@ func addAllEventHandlers( } handlers = append(handlers, handlerRegistration) } + case framework.ResourceClaimParameters: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaimParameters().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceClaimParameters, "ResourceClaimParameters"), + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + } + case framework.ResourceClassParameters: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClassParameters().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceClassParameters, "ResourceClassParameters"), + ); err != nil { + return err + } + handlers = append(handlers, handlerRegistration) + } case framework.StorageClass: if at&framework.Add != 0 { if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 6373ee16640..9cfa96e88fe 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -18,8 +18,10 @@ package dynamicresources import ( "context" + "encoding/json" "errors" "fmt" + "slices" "sort" "sync" @@ -30,6 +32,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -43,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/utils/ptr" ) @@ -70,37 +74,52 @@ type stateData struct { // Empty if the Pod has no claims. claims []*resourcev1alpha2.ResourceClaim + // podSchedulingState keeps track of the PodSchedulingContext + // (if one exists) and the changes made to it. + podSchedulingState podSchedulingState + + // resourceModel contains the information about available and allocated resources when using + // structured parameters and the pod needs this information. + resources resources + + // mutex must be locked while accessing any of the fields below. + mutex sync.Mutex + // The indices of all claims that: // - are allocated - // - use delayed allocation + // - use delayed allocation or the builtin controller // - were not available on at least one node // // Set in parallel during Filter, so write access there must be // protected by the mutex. Used by PostFilter. unavailableClaims sets.Set[int] - // podSchedulingState keeps track of the PodSchedulingContext - // (if one exists) and the changes made to it. - podSchedulingState podSchedulingState - - mutex sync.Mutex - informationsForClaim []informationForClaim } +func (d *stateData) Clone() framework.StateData { + return d +} + type informationForClaim struct { // The availableOnNode node filter of the claim converted from the // v1 API to nodeaffinity.NodeSelector by PreFilter for repeated // evaluation in Filter. Nil for claim which don't have it. availableOnNode *nodeaffinity.NodeSelector + // The status of the claim got from the // schedulingCtx by PreFilter for repeated // evaluation in Filter. Nil for claim which don't have it. status *resourcev1alpha2.ResourceClaimSchedulingStatus -} -func (d *stateData) Clone() framework.StateData { - return d + // structuredParameters is true if the claim is handled via the builtin + // controller. + structuredParameters bool + controller *claimController + + // Set by Reserved, published by PreBind. + allocation *resourcev1alpha2.AllocationResult + allocationDriverName string } type podSchedulingState struct { @@ -256,23 +275,90 @@ type dynamicResources struct { claimLister resourcev1alpha2listers.ResourceClaimLister classLister resourcev1alpha2listers.ResourceClassLister podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister + claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister + classParametersLister resourcev1alpha2listers.ResourceClassParametersLister + nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister + claimNameLookup *resourceclaim.Lookup + + // claimAssumeCache enables temporarily storing a newer claim object + // while the scheduler has allocated it and the corresponding object + // update from the apiserver has not been processed by the claim + // informer callbacks. Claims get added here in Reserve and removed by + // the informer callback (based on the "newer than" comparison in the + // assume cache) or when the API call in PreBind fails. + // + // It uses cache.MetaNamespaceKeyFunc to generate object names, which + // therefore are "/". + // + // This is necessary to ensure that reconstructing the resource usage + // at the start of a pod scheduling cycle doesn't reuse the resources + // assigned to such a claim. Alternatively, claim allocation state + // could also get tracked across pod scheduling cycles, but that + // - adds complexity (need to carefully sync state with informer events + // for claims and NodeResourceSlices) + // - would make integration with cluster autoscaler harder because it would need + // to trigger informer callbacks. + // + // When implementing cluster autoscaler support, this assume cache or + // something like it (see https://github.com/kubernetes/kubernetes/pull/112202) + // might have to be managed by the cluster autoscaler. + claimAssumeCache volumebinding.AssumeCache + + // inFlightAllocations is map from claim UUIDs to true for those claims + // for which allocation was triggered during a scheduling cycle and the + // corresponding claim status update call in PreBind has not been done + // yet. If another pod needs the claim, the pod is treated as "not + // schedulable yet". The cluster event for the claim status update will + // make it schedulable. + // + // This mechanism avoids the following problem: + // - Pod A triggers allocation for claim X. + // - Pod B shares access to that claim and gets scheduled because + // the claim is assumed to be allocated. + // - PreBind for pod B is called first, tries to update reservedFor and + // fails because the claim is not really allocated yet. + // + // We could avoid the ordering problem by allowing either pod A or pod B + // to set the allocation. But that is more complicated and leads to another + // problem: + // - Pod A and B get scheduled as above. + // - PreBind for pod A gets called first, then fails with a temporary API error. + // It removes the updated claim from the assume cache because of that. + // - PreBind for pod B gets called next and succeeds with adding the + // allocation and its own reservedFor entry. + // - The assume cache is now not reflecting that the claim is allocated, + // which could lead to reusing the same resource for some other claim. + // + // A sync.Map is used because in practice sharing of a claim between + // pods is expected to be rare compared to per-pod claim, so we end up + // hitting the "multiple goroutines read, write, and overwrite entries + // for disjoint sets of keys" case that sync.Map is optimized for. + inFlightAllocations sync.Map } // New initializes a new plugin and returns it. -func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { if !fts.EnableDynamicResourceAllocation { // Disabled, won't do anything. return &dynamicResources{}, nil } - return &dynamicResources{ + logger := klog.FromContext(ctx) + pl := &dynamicResources{ enabled: true, fh: fh, clientset: fh.ClientSet(), claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(), classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(), podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(), - }, nil + claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(), + classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(), + nodeResourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().NodeResourceSlices().Lister(), + claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()), + claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil), + } + + return pl, nil } var _ framework.PreEnqueuePlugin = &dynamicResources{} @@ -296,7 +382,13 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint if !pl.enabled { return nil } + events := []framework.ClusterEventWithHint{ + // Changes for claim or class parameters creation may make pods + // schedulable which depend on claims using those parameters. + {Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange}, + {Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange}, + // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, // When a driver has provided additional information, a pod waiting for that information @@ -321,6 +413,149 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status return nil } +// isSchedulableAfterClaimParametersChange is invoked for add and update claim parameters events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. The delete claim event will not invoke it, so newObj will never be nil. +func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClaimParameters](oldObj, newObj) + if err != nil { + // Shouldn't happen. + return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err) + } + + usesParameters := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + ref := claim.Spec.ParametersRef + if ref == nil { + return + } + + // Using in-tree parameters directly? + if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + ref.Kind == "ResourceClaimParameters" { + if modifiedParameters.Name == ref.Name { + usesParameters = true + } + return + } + + // Need to look for translated parameters. + generatedFrom := modifiedParameters.GeneratedFrom + if generatedFrom == nil { + return + } + if generatedFrom.APIGroup == ref.APIGroup && + generatedFrom.Kind == ref.Kind && + generatedFrom.Name == ref.Name { + usesParameters = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error()) + return framework.QueueSkip, nil + } + + if !usesParameters { + // This were not the parameters the pod was waiting for. + logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + if originalParameters == nil { + logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil + } + + // Modifications may or may not be relevant. If the entire + // requests are as before, then something else must have changed + // and we don't care. + if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) { + logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil +} + +// isSchedulableAfterClassParametersChange is invoked for add and update class parameters events reported by +// an informer. It checks whether that change made a previously unschedulable +// pod schedulable. It errs on the side of letting a pod scheduling attempt +// happen. The delete class event will not invoke it, so newObj will never be nil. +func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClassParameters](oldObj, newObj) + if err != nil { + // Shouldn't happen. + return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err) + } + + usesParameters := false + if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) { + class, err := pl.classLister.Get(claim.Spec.ResourceClassName) + if err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "look up resource class") + } + return + } + ref := class.ParametersRef + if ref == nil { + return + } + + // Using in-tree parameters directly? + if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + ref.Kind == "ResourceClassParameters" { + if modifiedParameters.Name == ref.Name { + usesParameters = true + } + return + } + + // Need to look for translated parameters. + generatedFrom := modifiedParameters.GeneratedFrom + if generatedFrom == nil { + return + } + if generatedFrom.APIGroup == ref.APIGroup && + generatedFrom.Kind == ref.Kind && + generatedFrom.Name == ref.Name { + usesParameters = true + } + }); err != nil { + // This is not an unexpected error: we know that + // foreachPodResourceClaim only returns errors for "not + // schedulable". + logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error()) + return framework.QueueSkip, nil + } + + if !usesParameters { + // This were not the parameters the pod was waiting for. + logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + if originalParameters == nil { + logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters)) + return framework.Queue, nil + } + + // Modifications may or may not be relevant. If the entire + // requests are as before, then something else must have changed + // and we don't care. + if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) { + logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.QueueSkip, nil + } + + logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters)) + return framework.Queue, nil +} + // isSchedulableAfterClaimChange is invoked for add and update claim events reported by // an informer. It checks whether that change made a previously unschedulable // pod schedulable. It errs on the side of letting a pod scheduling attempt @@ -345,6 +580,33 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po return framework.QueueSkip, nil } + if originalClaim != nil && + resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) && + modifiedClaim.Status.Allocation == nil { + // A claim with structured parameters was deallocated. This might have made + // resources available for other pods. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/123697): + // check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO). + // + // There is a small race here: + // - The dynamicresources plugin allocates claim A and updates the assume cache. + // - A second pod gets marked as unschedulable based on that assume cache. + // - Before the informer cache here catches up, the pod runs, terminates and + // the claim gets deallocated without ever sending the claim status with + // allocation to the scheduler. + // - The comparison below is for a *very* old claim with no allocation and the + // new claim where the allocation is already removed again, so no + // RemovedClaimAllocation event gets emitted. + // + // This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30. + // TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache + // into the event mechanism. This can be tackled together with adding autoscaler + // support, which also needs to do something with the assume cache. + logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) + return framework.Queue, nil + } + if !usesClaim { // This was not the claim the pod was waiting for. logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim)) @@ -526,7 +788,7 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2. // It calls an optional handler for those claims that it finds. func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error { for _, resource := range pod.Spec.ResourceClaims { - claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource) + claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource) if err != nil { return err } @@ -578,24 +840,21 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, statusUnschedulable(logger, err.Error()) } logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claims)) + // If the pod does not reference any claim, // DynamicResources Filter has nothing to do with the Pod. if len(claims) == 0 { return nil, framework.NewStatus(framework.Skip) } - // Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims. + // Fetch PodSchedulingContext, it's going to be needed when checking claims. if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil { return nil, statusError(logger, err) } s.informationsForClaim = make([]informationForClaim, len(claims)) + needResourceInformation := false for index, claim := range claims { - if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate && - claim.Status.Allocation == nil { - // This will get resolved by the resource driver. - return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) - } if claim.Status.DeallocationRequested { // This will get resolved by the resource driver. return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) @@ -606,16 +865,20 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // Resource is in use. The pod has to wait. return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) } - if claim.Status.Allocation != nil && - claim.Status.Allocation.AvailableOnNodes != nil { - nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes) - if err != nil { - return nil, statusError(logger, err) + + if claim.Status.Allocation != nil { + if claim.Status.Allocation.AvailableOnNodes != nil { + nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes) + if err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim[index].availableOnNode = nodeSelector } - s.informationsForClaim[index].availableOnNode = nodeSelector - } - if claim.Status.Allocation == nil && - claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer { + + // The claim was allocated by the scheduler if it has the finalizer that is + // reserved for Kubernetes. + s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) + } else { // The ResourceClass might have a node filter. This is // useful for trimming the initial set of potential // nodes before we ask the driver(s) for information @@ -638,16 +901,140 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl } s.informationsForClaim[index].availableOnNode = selector } - // Now we need information from drivers. s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) + + if class.StructuredParameters != nil && *class.StructuredParameters { + s.informationsForClaim[index].structuredParameters = true + + // Allocation in flight? Better wait for that + // to finish, see inFlightAllocations + // documentation for details. + if _, found := pl.inFlightAllocations.Load(claim.UID); found { + return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim))) + } + + // We need the claim and class parameters. If + // they don't exist yet, the pod has to wait. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/123697): + // check this already in foreachPodResourceClaim, together with setting up informationsForClaim. + // Then PreEnqueue will also check for existence of parameters. + classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim) + if status != nil { + return nil, status + } + controller, err := newClaimController(logger, class, classParameters, claimParameters) + if err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim[index].controller = controller + needResourceInformation = true + } else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate { + // This will get resolved by the resource driver. + return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + } } } + if needResourceInformation { + // Doing this over and over again for each pod could be avoided + // by parsing once when creating the plugin and then updating + // that state in informer callbacks. But that would cause + // problems for using the plugin in the Cluster Autoscaler. If + // this step here turns out to be expensive, we may have to + // maintain and update state more persistently. + resources, err := newResourceModel(logger, pl.nodeResourceSliceLister, pl.claimAssumeCache) + if err != nil { + return nil, statusError(logger, err) + } + s.resources = resources + } + s.claims = claims - state.Write(stateKey, s) return nil, nil } +func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters, status *framework.Status) { + classParameters, status = pl.lookupClassParameters(logger, class) + if status != nil { + return + } + claimParameters, status = pl.lookupClaimParameters(logger, claim) + return +} + +func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass) (*resourcev1alpha2.ResourceClassParameters, *framework.Status) { + if class.ParametersRef == nil { + return nil, nil + } + + if class.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + class.ParametersRef.Kind == "ResourceClassParameters" { + // Use the parameters which were referenced directly. + parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name))) + } + return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err)) + } + return parameters, nil + } + + // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer + allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything()) + if err != nil { + return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err)) + } + for _, parameters := range allParameters { + if parameters.GeneratedFrom == nil { + continue + } + if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup && + parameters.GeneratedFrom.Kind == class.ParametersRef.Kind && + parameters.GeneratedFrom.Name == class.ParametersRef.Name && + parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace { + return parameters, nil + } + } + return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name))) +} + +func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) { + if claim.Spec.ParametersRef == nil { + return nil, nil + } + if claim.Spec.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group && + claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" { + // Use the parameters which were referenced directly. + parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) + } + return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err)) + } + return parameters, nil + } + + // TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer + allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything()) + if err != nil { + return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err)) + } + for _, parameters := range allParameters { + if parameters.GeneratedFrom == nil { + continue + } + if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup && + parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind && + parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name { + return parameters, nil + } + } + return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name))) +} + // PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions { return nil @@ -703,22 +1090,39 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState case claim.Status.DeallocationRequested: // We shouldn't get here. PreFilter already checked this. return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) - case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer: + case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer || + state.informationsForClaim[index].structuredParameters: if selector := state.informationsForClaim[index].availableOnNode; selector != nil { if matches := selector.Match(node); !matches { return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName) } } - if status := state.informationsForClaim[index].status; status != nil { - for _, unsuitableNode := range status.UnsuitableNodes { - if node.Name == unsuitableNode { - return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes) + // Can the builtin controller tell us whether the node is suitable? + if state.informationsForClaim[index].structuredParameters { + suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources) + if err != nil { + // An error indicates that something wasn't configured correctly, for example + // writing a CEL expression which doesn't handle a map lookup error. Normally + // this should never fail. We could return an error here, but then the pod + // would get retried. Instead we ignore the node. + return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + } + if !suitable { + return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + } + } else { + if status := state.informationsForClaim[index].status; status != nil { + for _, unsuitableNode := range status.UnsuitableNodes { + if node.Name == unsuitableNode { + return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes) + } } } } default: - // This should have been delayed allocation. Immediate - // allocation was already checked for in PreFilter. + // This claim should have been handled above. + // Immediate allocation with control plane controller + // was already checked for in PreFilter. return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode)) } } @@ -736,7 +1140,11 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState // delayed allocation. Claims with immediate allocation // would just get allocated again for a random node, // which is unlikely to help the pod. - if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer { + // + // Claims with builtin controller are handled like + // claims with delayed allocation. + if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer || + state.informationsForClaim[index].controller != nil { state.unavailableClaims.Insert(index) } } @@ -769,12 +1177,19 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS claim := state.claims[index] if len(claim.Status.ReservedFor) == 0 || len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID { + // Is the claim is handled by the builtin controller? + // Then we can simply clear the allocation. Once the + // claim informer catches up, the controllers will + // be notified about this change. + clearAllocation := state.informationsForClaim[index].controller != nil + // Before we tell a driver to deallocate a claim, we // have to stop telling it to allocate. Otherwise, // depending on timing, it will deallocate the claim, // see a PodSchedulingContext with selected node, and // allocate again for that same node. - if state.podSchedulingState.schedulingCtx != nil && + if !clearAllocation && + state.podSchedulingState.schedulingCtx != nil && state.podSchedulingState.schedulingCtx.Spec.SelectedNode != "" { state.podSchedulingState.selectedNode = ptr.To("") if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { @@ -782,9 +1197,13 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS } } - claim := state.claims[index].DeepCopy() - claim.Status.DeallocationRequested = true + claim := claim.DeepCopy() claim.Status.ReservedFor = nil + if clearAllocation { + claim.Status.Allocation = nil + } else { + claim.Status.DeallocationRequested = true + } logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { return nil, statusError(logger, err) @@ -815,14 +1234,15 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta logger := klog.FromContext(ctx) pending := false - for _, claim := range state.claims { - if claim.Status.Allocation == nil { + for index, claim := range state.claims { + if claim.Status.Allocation == nil && + state.informationsForClaim[index].controller == nil { pending = true break } } if !pending { - logger.V(5).Info("no pending claims", "pod", klog.KObj(pod)) + logger.V(5).Info("no pending claims with control plane controller", "pod", klog.KObj(pod)) return nil } @@ -889,7 +1309,7 @@ func haveNode(nodeNames []string, nodeName string) bool { } // Reserve reserves claims for the pod. -func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { +func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) { if !pl.enabled { return nil } @@ -903,6 +1323,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat numDelayedAllocationPending := 0 numClaimsWithStatusInfo := 0 + claimsWithBuiltinController := make([]int, 0, len(state.claims)) logger := klog.FromContext(ctx) for index, claim := range state.claims { if claim.Status.Allocation != nil { @@ -914,7 +1335,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat continue } - // Must be delayed allocation. + // Do we have the builtin controller? + if state.informationsForClaim[index].controller != nil { + claimsWithBuiltinController = append(claimsWithBuiltinController, index) + continue + } + + // Must be delayed allocation with control plane controller. numDelayedAllocationPending++ // Did the driver provide information that steered node @@ -924,12 +1351,12 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat } } - if numDelayedAllocationPending == 0 { + if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 { // Nothing left to do. return nil } - if !state.preScored { + if !state.preScored && numDelayedAllocationPending > 0 { // There was only one candidate that passed the Filters and // therefore PreScore was not called. // @@ -944,11 +1371,36 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat } } + // Prepare allocation of claims handled by the schedulder. + for _, index := range claimsWithBuiltinController { + claim := state.claims[index] + driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources) + if err != nil { + // We checked before that the node is suitable. This shouldn't have failed, + // so treat this as an error. + return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err)) + } + state.informationsForClaim[index].allocation = allocation + state.informationsForClaim[index].allocationDriverName = driverName + pl.inFlightAllocations.Store(claim.UID, true) + claim = claim.DeepCopy() + claim.Status.DriverName = driverName + claim.Status.Allocation = allocation + if err := pl.claimAssumeCache.Assume(claim); err != nil { + return statusError(logger, fmt.Errorf("update claim assume cache: %v", err)) + } + logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation) + } + // When there is only one pending resource, we can go ahead with // requesting allocation even when we don't have the information from // the driver yet. Otherwise we wait for information before blindly // making a decision that might have to be reversed later. - if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending { + // + // If all pending claims are handled with the builtin controller, + // there is no need for a PodSchedulingContext change. + if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 || + numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending { // TODO: can we increase the chance that the scheduler picks // the same node as before when allocation is on-going, // assuming that that node still fits the pod? Picking a @@ -970,6 +1422,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return nil } + // If all pending claims are handled with the builtin controller, then + // we can allow the pod to proceed. Allocating and reserving the claims + // will be done in PreBind. + if numDelayedAllocationPending == 0 { + return nil + } + // More than one pending claim and not enough information about all of them. // // TODO: can or should we ensure that schedulingCtx gets aborted while @@ -1016,7 +1475,15 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt } } - for _, claim := range state.claims { + for index, claim := range state.claims { + // If allocation was in-flight, then it's not anymore and we need to revert the + // claim object in the assume cache to what it was before. + if state.informationsForClaim[index].controller != nil { + if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found { + pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + } + } + if claim.Status.Allocation != nil && resourceclaim.IsReservedForPod(pod, claim) { // Remove pod from ReservedFor. A strategic-merge-patch is used @@ -1038,7 +1505,10 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt // PreBind gets called in a separate goroutine after it has been determined // that the pod should get bound to this node. Because Reserve did not actually -// reserve claims, we need to do it now. If that fails, we return an error and +// reserve claims, we need to do it now. For claims with the builtin controller, +// we also handle the allocation. +// +// If anything fails, we return an error and // the pod will have to go into the backoff queue. The scheduler will call // Unreserve as part of the error handling. func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { @@ -1056,6 +1526,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat logger := klog.FromContext(ctx) // Was publishing delayed? If yes, do it now and then cause binding to stop. + // This will not happen if all claims get handled by builtin controllers. if state.podSchedulingState.isDirty() { if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { return statusError(logger, err) @@ -1065,23 +1536,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat for index, claim := range state.claims { if !resourceclaim.IsReservedForPod(pod, claim) { - // The claim might be stale, for example because the claim can get shared and some - // other goroutine has updated it in the meantime. We therefore cannot use - // SSA here to add the pod because then we would have to send the entire slice - // or use different field manager strings for each entry. - // - // With a strategic-merge-patch, we can simply send one new entry. The apiserver - // validation will catch if two goroutines try to do that at the same time and - // the claim cannot be shared. - patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`, - claim.UID, - pod.Name, - pod.UID, - ) - logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) - claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") - logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim)) - // TODO: metric for update errors. + claim, err := pl.bindClaim(ctx, state, index, pod, nodeName) if err != nil { return statusError(logger, err) } @@ -1093,6 +1548,75 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat return nil } +// bindClaim gets called by PreBind for claim which is not reserved for the pod yet. +// It might not even be allocated. bindClaim then ensures that the allocation +// and reservation are recorded. This finishes the work started in Reserve. +func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) { + logger := klog.FromContext(ctx) + claim := state.claims[index] + allocationPatch := "" + + allocation := state.informationsForClaim[index].allocation + logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation) + + // Do we need to store an allocation result from Reserve? + if allocation != nil { + buffer, err := json.Marshal(allocation) + if err != nil { + return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err) + } + allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer)) + + // The finalizer needs to be added in a normal update. Using a simple update is fine + // because we don't expect concurrent modifications while the claim is not allocated + // yet. If there are any, we want to fail. + // + // If we were interrupted in the past, it might already be set and we simply continue. + if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) { + claim := state.claims[index].DeepCopy() + claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer) + if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { + return nil, fmt.Errorf("add finalizer: %v", err) + } + } + } + + // The claim might be stale, for example because the claim can get shared and some + // other goroutine has updated it in the meantime. We therefore cannot use + // SSA here to add the pod because then we would have to send the entire slice + // or use different field manager strings for each entry. + // + // With a strategic-merge-patch, we can simply send one new entry. The apiserver + // validation will catch if two goroutines try to do that at the same time and + // the claim cannot be shared. + // + // Note that this also works when the allocation result gets added twice because + // two pods both started using a shared claim: the first pod to get here adds the + // allocation result. The second pod then only adds itself to reservedFor. + patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`, + claim.UID, + allocationPatch, + pod.Name, + pod.UID, + ) + if loggerV := logger.V(6); loggerV.Enabled() { + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch) + } else { + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) + } + claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status") + logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err) + if allocationPatch != "" { + // The scheduler was handling allocation. Now that has + // completed, either successfully or with a failure. + if err != nil { + pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + } + pl.inFlightAllocations.Delete(claim.UID) + } + return claim, err +} + // PostBind is called after a pod is successfully bound to a node. Now we are // sure that a PodSchedulingContext object, if it exists, is definitely not going to // be needed anymore and can delete it. This is a one-shot thing, there won't diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index 4704f60a65b..070ece83920 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -309,8 +309,9 @@ func TestPlugin(t *testing.T) { }, }, "waiting-for-immediate-allocation": { - pod: podWithClaimName, - claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + pod: podWithClaimName, + claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim}, + classes: []*resourcev1alpha2.ResourceClass{resourceClass}, want: want{ prefilter: result{ status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`), @@ -812,7 +813,6 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl tc.client = fake.NewSimpleClientset() reactor := createReactor(tc.client.Tracker()) tc.client.PrependReactor("*", "*", reactor) - tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) opts := []runtime.Option{ diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go new file mode 100644 index 00000000000..e8a5d0a4522 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" + "k8s.io/apimachinery/pkg/labels" + resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" +) + +// resources is a map "node name" -> "driver name" -> available and +// allocated resources per structured parameter model. +type resources map[string]map[string]resourceModels + +// resourceModels may have more than one entry because it is valid for a driver to +// use more than one structured parameter model. +type resourceModels struct { + // TODO: add some structured parameter model +} + +// newResourceModel parses the available information about resources. Objects +// with an unknown structured parameter model silently ignored. An error gets +// logged later when parameters required for a pod depend on such an unknown +// model. +func newResourceModel(logger klog.Logger, nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister, claimAssumeCache volumebinding.AssumeCache) (resources, error) { + model := make(resources) + + slices, err := nodeResourceSliceLister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("list node resource slices: %w", err) + } + for _, slice := range slices { + if model[slice.NodeName] == nil { + model[slice.NodeName] = make(map[string]resourceModels) + } + resource := model[slice.NodeName][slice.DriverName] + // TODO: add some structured parameter model + model[slice.NodeName][slice.DriverName] = resource + } + + objs := claimAssumeCache.List(nil) + for _, obj := range objs { + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj) + } + if claim.Status.Allocation == nil { + continue + } + for _, handle := range claim.Status.Allocation.ResourceHandles { + structured := handle.StructuredData + if structured == nil { + continue + } + if model[structured.NodeName] == nil { + model[structured.NodeName] = make(map[string]resourceModels) + } + // resource := model[structured.NodeName][handle.DriverName] + // TODO: add some structured parameter model + // for _, result := range structured.Results { + // // Call AddAllocation for each known model. Each call itself needs to check for nil. + // } + } + } + + return model, nil +} + +func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClass, classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters) (*claimController, error) { + // Each node driver is separate from the others. Each driver may have + // multiple requests which need to be allocated together, so here + // we have to collect them per model. + // TODO: implement some structured parameters model + + c := &claimController{ + class: class, + classParameters: classParameters, + claimParameters: claimParameters, + } + return c, nil +} + +// claimController currently wraps exactly one structured parameter model. + +type claimController struct { + class *resourcev1alpha2.ResourceClass + classParameters *resourcev1alpha2.ResourceClassParameters + claimParameters *resourcev1alpha2.ResourceClaimParameters + // TODO: implement some structured parameters model +} + +func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) { + // TODO: implement some structured parameters model + return true, nil +} + +func (c claimController) allocate(ctx context.Context, nodeName string, resources resources) (string, *resourcev1alpha2.AllocationResult, error) { + allocation := &resourcev1alpha2.AllocationResult{ + Shareable: c.claimParameters.Shareable, + AvailableOnNodes: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{nodeName}}, + }, + }, + }, + }, + } + + // TODO: implement some structured parameters model + + return c.class.DriverName, allocation, nil +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index cc839b3e3ee..8bd4b86f680 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -72,17 +72,19 @@ const ( // - a Pod that is deleted // - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle. // - an existing Pod that was unscheduled but gets scheduled to a Node. - Pod GVK = "Pod" - Node GVK = "Node" - PersistentVolume GVK = "PersistentVolume" - PersistentVolumeClaim GVK = "PersistentVolumeClaim" - CSINode GVK = "storage.k8s.io/CSINode" - CSIDriver GVK = "storage.k8s.io/CSIDriver" - CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity" - StorageClass GVK = "storage.k8s.io/StorageClass" - PodSchedulingContext GVK = "PodSchedulingContext" - ResourceClaim GVK = "ResourceClaim" - ResourceClass GVK = "ResourceClass" + Pod GVK = "Pod" + Node GVK = "Node" + PersistentVolume GVK = "PersistentVolume" + PersistentVolumeClaim GVK = "PersistentVolumeClaim" + CSINode GVK = "storage.k8s.io/CSINode" + CSIDriver GVK = "storage.k8s.io/CSIDriver" + CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity" + StorageClass GVK = "storage.k8s.io/StorageClass" + PodSchedulingContext GVK = "PodSchedulingContext" + ResourceClaim GVK = "ResourceClaim" + ResourceClass GVK = "ResourceClass" + ResourceClaimParameters GVK = "ResourceClaimParameters" + ResourceClassParameters GVK = "ResourceClassParameters" // WildCard is a special GVK to match all resources. // e.g., If you register `{Resource: "*", ActionType: All}` in EventsToRegister, @@ -176,6 +178,8 @@ func UnrollWildCardResource() []ClusterEventWithHint { {Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}}, {Event: ClusterEvent{Resource: ResourceClaim, ActionType: All}}, {Event: ClusterEvent{Resource: ResourceClass, ActionType: All}}, + {Event: ClusterEvent{Resource: ResourceClaimParameters, ActionType: All}}, + {Event: ClusterEvent{Resource: ResourceClassParameters, ActionType: All}}, } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6297e439884..3ebe9c7600a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -644,6 +644,12 @@ func Test_buildQueueingHintMap(t *testing.T) { {Resource: framework.ResourceClass, ActionType: framework.All}: { {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, }, + {Resource: framework.ResourceClaimParameters, ActionType: framework.All}: { + {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, + }, + {Resource: framework.ResourceClassParameters, ActionType: framework.All}: { + {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn}, + }, }, }, { @@ -768,17 +774,19 @@ func Test_UnionedGVKs(t *testing.T) { Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins }, want: map[framework.GVK]framework.ActionType{ - framework.Pod: framework.All, - framework.Node: framework.All, - framework.CSINode: framework.All, - framework.CSIDriver: framework.All, - framework.CSIStorageCapacity: framework.All, - framework.PersistentVolume: framework.All, - framework.PersistentVolumeClaim: framework.All, - framework.StorageClass: framework.All, - framework.PodSchedulingContext: framework.All, - framework.ResourceClaim: framework.All, - framework.ResourceClass: framework.All, + framework.Pod: framework.All, + framework.Node: framework.All, + framework.CSINode: framework.All, + framework.CSIDriver: framework.All, + framework.CSIStorageCapacity: framework.All, + framework.PersistentVolume: framework.All, + framework.PersistentVolumeClaim: framework.All, + framework.StorageClass: framework.All, + framework.PodSchedulingContext: framework.All, + framework.ResourceClaim: framework.All, + framework.ResourceClass: framework.All, + framework.ResourceClaimParameters: framework.All, + framework.ResourceClassParameters: framework.All, }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 496d3d80b10..58ac1a15712 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -576,11 +576,13 @@ func ClusterRoles() []rbacv1.ClusterRole { // Needed for dynamic resource allocation. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { kubeSchedulerRules = append(kubeSchedulerRules, - rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclaims", "resourceclasses").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclasses").RuleOrDie(), + rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), rbacv1helpers.NewRule(ReadWrite...).Groups(resourceGroup).Resources("podschedulingcontexts").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("podschedulingcontexts/status").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("noderesourceslices", "resourceclassparameters", "resourceclaimparameters").RuleOrDie(), ) } roles = append(roles, rbacv1.ClusterRole{ diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go index e3b8b35ac6b..5c1042329be 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go @@ -183,3 +183,17 @@ func CanBeReserved(claim *resourcev1alpha2.ResourceClaim) bool { return claim.Status.Allocation.Shareable || len(claim.Status.ReservedFor) == 0 } + +// IsAllocatedWithStructuredParameters checks whether the claim is allocated +// and the allocation was done with structured parameters. +func IsAllocatedWithStructuredParameters(claim *resourcev1alpha2.ResourceClaim) bool { + if claim.Status.Allocation == nil { + return false + } + for _, handle := range claim.Status.Allocation.ResourceHandles { + if handle.StructuredData != nil { + return true + } + } + return false +}