diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 6d32ff47cfe..4f70fba83a2 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -31,6 +31,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" @@ -54,6 +55,9 @@ const ( // framework.CycleState, in the later phases we don't need to call Write method // to update the value type stateData struct { + // preScored is true if PreScore was invoked. + preScored bool + // A copy of all claims for the Pod (i.e. 1:1 match with // pod.Spec.ResourceClaims), initially with the status from the start // of the scheduling cycle. Each claim instance is read-only because it @@ -72,17 +76,9 @@ type stateData struct { // protected by the mutex. Used by PostFilter. unavailableClaims sets.Int - // A pointer to the PodSchedulingContext object for the pod, if one exists. - // Gets set on demand. - // - // Conceptually, this object belongs into the scheduler framework - // where it might get shared by different plugins. But in practice, - // it is currently only used by dynamic provisioning and thus - // managed entirely here. - schedulingCtx *resourcev1alpha2.PodSchedulingContext - - // podSchedulingDirty is true if the current copy was locally modified. - podSchedulingDirty bool + // podSchedulingState keeps track of the PodSchedulingContext + // (if one exists) and the changes made to it. + podSchedulingState podSchedulingState mutex sync.Mutex @@ -123,91 +119,108 @@ func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes. return nil } -// initializePodSchedulingContext can be called concurrently. It returns an existing PodSchedulingContext -// object if there is one already, retrieves one if not, or as a last resort creates -// one from scratch. -func (d *stateData) initializePodSchedulingContexts(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) (*resourcev1alpha2.PodSchedulingContext, error) { - // TODO (#113701): check if this mutex locking can be avoided by calling initializePodSchedulingContext during PreFilter. - d.mutex.Lock() - defer d.mutex.Unlock() +type podSchedulingState struct { + // A pointer to the PodSchedulingContext object for the pod, if one exists + // in the API server. + // + // Conceptually, this object belongs into the scheduler framework + // where it might get shared by different plugins. But in practice, + // it is currently only used by dynamic provisioning and thus + // managed entirely here. + schedulingCtx *resourcev1alpha2.PodSchedulingContext - if d.schedulingCtx != nil { - return d.schedulingCtx, nil - } + // selectedNode is set if (and only if) a node has been selected. + selectedNode *string + // potentialNodes is set if (and only if) the potential nodes field + // needs to be updated or set. + potentialNodes *[]string +} + +func (p *podSchedulingState) isDirty() bool { + return p.selectedNode != nil || + p.potentialNodes != nil +} + +// init checks whether there is already a PodSchedulingContext object. +// Must not be called concurrently, +func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error { schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) switch { case apierrors.IsNotFound(err): - controller := true - schedulingCtx = &resourcev1alpha2.PodSchedulingContext{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Pod", - Name: pod.Name, - UID: pod.UID, - Controller: &controller, - }, - }, - }, - } - err = nil + return nil case err != nil: - return nil, err + return err default: // We have an object, but it might be obsolete. if !metav1.IsControlledBy(schedulingCtx, pod) { - return nil, fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name) + return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name) } } - d.schedulingCtx = schedulingCtx - return schedulingCtx, err + p.schedulingCtx = schedulingCtx + return nil } -// publishPodSchedulingContext creates or updates the PodSchedulingContext object. -func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset kubernetes.Interface, schedulingCtx *resourcev1alpha2.PodSchedulingContext) error { - d.mutex.Lock() - defer d.mutex.Unlock() +// publish creates or updates the PodSchedulingContext object, if necessary. +// Must not be called concurrently. +func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error { + if !p.isDirty() { + return nil + } var err error logger := klog.FromContext(ctx) - msg := "Updating PodSchedulingContext" - if schedulingCtx.UID == "" { - msg = "Creating PodSchedulingContext" - } - if loggerV := logger.V(6); loggerV.Enabled() { - // At a high enough log level, dump the entire object. - loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx)) + if p.schedulingCtx != nil { + // Update it. + schedulingCtx := p.schedulingCtx.DeepCopy() + if p.selectedNode != nil { + schedulingCtx.Spec.SelectedNode = *p.selectedNode + } + if p.potentialNodes != nil { + schedulingCtx.Spec.PotentialNodes = *p.potentialNodes + } + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx)) + } else { + logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx)) + } + _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{}) } else { - logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx)) - } - if schedulingCtx.UID == "" { - schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{}) - } else { - // TODO (#113700): patch here to avoid racing with drivers which update the status. - schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{}) + // Create it. + schedulingCtx := &resourcev1alpha2.PodSchedulingContext{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})}, + }, + } + if p.selectedNode != nil { + schedulingCtx.Spec.SelectedNode = *p.selectedNode + } + if p.potentialNodes != nil { + schedulingCtx.Spec.PotentialNodes = *p.potentialNodes + } + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx)) + } else { + logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx)) + } + _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{}) } if err != nil { return err } - d.schedulingCtx = schedulingCtx - d.podSchedulingDirty = false + p.potentialNodes = nil + p.selectedNode = nil return nil } -// storePodSchedulingContext replaces the pod schedulingCtx object in the state. -func (d *stateData) storePodSchedulingContexts(schedulingCtx *resourcev1alpha2.PodSchedulingContext) { - d.mutex.Lock() - defer d.mutex.Unlock() - - d.schedulingCtx = schedulingCtx - d.podSchedulingDirty = true -} - func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus { + if schedulingCtx == nil { + return nil + } for _, status := range schedulingCtx.Status.ResourceClaims { if status.Name == podClaimName { return &status @@ -564,6 +577,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, framework.NewStatus(framework.Skip) } + // Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims. + if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim = make([]informationForClaim, len(claims)) for index, claim := range claims { if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate && @@ -614,11 +632,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl s.informationsForClaim[index].availableOnNode = selector } // Now we need information from drivers. - schedulingCtx, err := s.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return nil, statusError(logger, err) - } - s.informationsForClaim[index].status = statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) + s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) } } @@ -772,64 +786,71 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta if err != nil { return statusError(klog.FromContext(ctx), err) } + defer func() { + state.preScored = true + }() if len(state.claims) == 0 { return nil } logger := klog.FromContext(ctx) - schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return statusError(logger, err) - } pending := false for _, claim := range state.claims { if claim.Status.Allocation == nil { pending = true } } - if pending && !haveAllNodes(schedulingCtx.Spec.PotentialNodes, nodes) { - // Remember the potential nodes. The object will get created or - // updated in Reserve. This is both an optimization and - // covers the case that PreScore doesn't get called when there - // is only a single node. - logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) - schedulingCtx = schedulingCtx.DeepCopy() - numNodes := len(nodes) - if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize { - numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize - } - schedulingCtx.Spec.PotentialNodes = make([]string, 0, numNodes) - if numNodes == len(nodes) { - // Copy all node names. - for _, node := range nodes { - schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, node.Name) - } - } else { - // Select a random subset of the nodes to comply with - // the PotentialNodes length limit. Randomization is - // done for us by Go which iterates over map entries - // randomly. - nodeNames := map[string]struct{}{} - for _, node := range nodes { - nodeNames[node.Name] = struct{}{} - } - for nodeName := range nodeNames { - if len(schedulingCtx.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize { - break - } - schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, nodeName) - } - } - sort.Strings(schedulingCtx.Spec.PotentialNodes) - state.storePodSchedulingContexts(schedulingCtx) + if !pending { + logger.V(5).Info("no pending claims", "pod", klog.KObj(pod)) + return nil } - logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) { + logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + return nil + } + + // Remember the potential nodes. The object will get created or + // updated in Reserve. This is both an optimization and + // covers the case that PreScore doesn't get called when there + // is only a single node. + logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + numNodes := len(nodes) + if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize { + numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize + } + potentialNodes := make([]string, 0, numNodes) + if numNodes == len(nodes) { + // Copy all node names. + for _, node := range nodes { + potentialNodes = append(potentialNodes, node.Name) + } + } else { + // Select a random subset of the nodes to comply with + // the PotentialNodes length limit. Randomization is + // done for us by Go which iterates over map entries + // randomly. + nodeNames := map[string]struct{}{} + for _, node := range nodes { + nodeNames[node.Name] = struct{}{} + } + for nodeName := range nodeNames { + if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize { + break + } + potentialNodes = append(potentialNodes, nodeName) + } + } + sort.Strings(potentialNodes) + state.podSchedulingState.potentialNodes = &potentialNodes return nil } -func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool { +func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*v1.Node) bool { + if schedulingCtx == nil { + return false + } for _, node := range nodes { - if !haveNode(nodeNames, node.Name) { + if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Name) { return false } } @@ -861,10 +882,6 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat numDelayedAllocationPending := 0 numClaimsWithStatusInfo := 0 logger := klog.FromContext(ctx) - schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return statusError(logger, err) - } for index, claim := range state.claims { if claim.Status.Allocation != nil { // Allocated, but perhaps not reserved yet. @@ -894,7 +911,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat // Did the driver provide information that steered node // selection towards a node that it can support? - if statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { + if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { numClaimsWithStatusInfo++ } } @@ -905,16 +922,19 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return nil } - podSchedulingDirty := state.podSchedulingDirty - if len(schedulingCtx.Spec.PotentialNodes) == 0 { - // PreScore was not called, probably because there was - // only one candidate. We need to ask whether that - // node is suitable, otherwise the scheduler will pick - // it forever even when it cannot satisfy the claim. - schedulingCtx = schedulingCtx.DeepCopy() - schedulingCtx.Spec.PotentialNodes = []string{nodeName} - logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) - podSchedulingDirty = true + if !state.preScored { + // There was only one candidate that passed the Filters and + // therefore PreScore was not called. + // + // We need to ask whether that node is suitable, otherwise the + // scheduler will pick it forever even when it cannot satisfy + // the claim. + if state.podSchedulingState.schedulingCtx == nil || + !containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) { + potentialNodes := []string{nodeName} + state.podSchedulingState.potentialNodes = &potentialNodes + logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + } } // When there is only one pending resource, we can go ahead with @@ -922,26 +942,26 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat // the driver yet. Otherwise we wait for information before blindly // making a decision that might have to be reversed later. if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending { - schedulingCtx = schedulingCtx.DeepCopy() // TODO: can we increase the chance that the scheduler picks // the same node as before when allocation is on-going, // assuming that that node still fits the pod? Picking a // different node may lead to some claims being allocated for // one node and others for another, which then would have to be // resolved with deallocation. - schedulingCtx.Spec.SelectedNode = nodeName - logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) - if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil { - return statusError(logger, err) + if state.podSchedulingState.schedulingCtx == nil || + state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName { + state.podSchedulingState.selectedNode = &nodeName + logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + return statusError(logger, err) + } + return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) } - return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) } // May have been modified earlier in PreScore or above. - if podSchedulingDirty { - if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil { - return statusError(logger, err) - } + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + return statusError(logger, err) } // More than one pending claim and not enough information about all of them. @@ -954,6 +974,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod)) } +func containsNode(hay []string, needle string) bool { + for _, node := range hay { + if node == needle { + return true + } + } + return false +} + // Unreserve clears the ReservedFor field for all claims. // It's idempotent, and does nothing if no state found for the given pod. func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index a3ebfbcaeba..8d94118df10 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -967,11 +967,12 @@ func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper { func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper { wrapper.OwnerReferences = []metav1.OwnerReference{ { - APIVersion: gvk.GroupVersion().String(), - Kind: gvk.Kind, - Name: name, - UID: types.UID(uid), - Controller: pointer.Bool(true), + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + UID: types.UID(uid), + Controller: pointer.Bool(true), + BlockOwnerDeletion: pointer.Bool(true), }, } return wrapper