From 5c7dac2d772e37ce1c805265c5ffe9282bcd51be Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 30 Aug 2023 09:13:31 +0200 Subject: [PATCH] dra scheduler: refactor PodSchedulingContext updates Instead of modifying the PodSchedulingContext and then creating or updating it, now the required changes (selected node, potential nodes) are tracked and the actual input for an API call is created if (and only if) needed at the end. This makes the code easier to read and change. In particular, replacing the Update call with Patch or Apply is easy. --- .../dynamicresources/dynamicresources.go | 315 ++++++++++-------- pkg/scheduler/testing/wrappers.go | 11 +- 2 files changed, 178 insertions(+), 148 deletions(-) diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 41e80b4ce24..2accbceb920 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -31,6 +31,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" @@ -54,6 +55,9 @@ const ( // framework.CycleState, in the later phases we don't need to call Write method // to update the value type stateData struct { + // preScored is true if PreScore was invoked. + preScored bool + // A copy of all claims for the Pod (i.e. 1:1 match with // pod.Spec.ResourceClaims), initially with the status from the start // of the scheduling cycle. Each claim instance is read-only because it @@ -72,17 +76,9 @@ type stateData struct { // protected by the mutex. Used by PostFilter. unavailableClaims sets.Int - // A pointer to the PodSchedulingContext object for the pod, if one exists. - // Gets set on demand. - // - // Conceptually, this object belongs into the scheduler framework - // where it might get shared by different plugins. But in practice, - // it is currently only used by dynamic provisioning and thus - // managed entirely here. - schedulingCtx *resourcev1alpha2.PodSchedulingContext - - // podSchedulingDirty is true if the current copy was locally modified. - podSchedulingDirty bool + // podSchedulingState keeps track of the PodSchedulingContext + // (if one exists) and the changes made to it. + podSchedulingState podSchedulingState mutex sync.Mutex @@ -123,91 +119,108 @@ func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes. return nil } -// initializePodSchedulingContext can be called concurrently. It returns an existing PodSchedulingContext -// object if there is one already, retrieves one if not, or as a last resort creates -// one from scratch. -func (d *stateData) initializePodSchedulingContexts(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) (*resourcev1alpha2.PodSchedulingContext, error) { - // TODO (#113701): check if this mutex locking can be avoided by calling initializePodSchedulingContext during PreFilter. - d.mutex.Lock() - defer d.mutex.Unlock() +type podSchedulingState struct { + // A pointer to the PodSchedulingContext object for the pod, if one exists + // in the API server. + // + // Conceptually, this object belongs into the scheduler framework + // where it might get shared by different plugins. But in practice, + // it is currently only used by dynamic provisioning and thus + // managed entirely here. + schedulingCtx *resourcev1alpha2.PodSchedulingContext - if d.schedulingCtx != nil { - return d.schedulingCtx, nil - } + // selectedNode is set if (and only if) a node has been selected. + selectedNode *string + // potentialNodes is set if (and only if) the potential nodes field + // needs to be updated or set. + potentialNodes *[]string +} + +func (p *podSchedulingState) isDirty() bool { + return p.selectedNode != nil || + p.potentialNodes != nil +} + +// init checks whether there is already a PodSchedulingContext object. +// Must not be called concurrently, +func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error { schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) switch { case apierrors.IsNotFound(err): - controller := true - schedulingCtx = &resourcev1alpha2.PodSchedulingContext{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Pod", - Name: pod.Name, - UID: pod.UID, - Controller: &controller, - }, - }, - }, - } - err = nil + return nil case err != nil: - return nil, err + return err default: // We have an object, but it might be obsolete. if !metav1.IsControlledBy(schedulingCtx, pod) { - return nil, fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name) + return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name) } } - d.schedulingCtx = schedulingCtx - return schedulingCtx, err + p.schedulingCtx = schedulingCtx + return nil } -// publishPodSchedulingContext creates or updates the PodSchedulingContext object. -func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset kubernetes.Interface, schedulingCtx *resourcev1alpha2.PodSchedulingContext) error { - d.mutex.Lock() - defer d.mutex.Unlock() +// publish creates or updates the PodSchedulingContext object, if necessary. +// Must not be called concurrently. +func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error { + if !p.isDirty() { + return nil + } var err error logger := klog.FromContext(ctx) - msg := "Updating PodSchedulingContext" - if schedulingCtx.UID == "" { - msg = "Creating PodSchedulingContext" - } - if loggerV := logger.V(6); loggerV.Enabled() { - // At a high enough log level, dump the entire object. - loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx)) + if p.schedulingCtx != nil { + // Update it. + schedulingCtx := p.schedulingCtx.DeepCopy() + if p.selectedNode != nil { + schedulingCtx.Spec.SelectedNode = *p.selectedNode + } + if p.potentialNodes != nil { + schedulingCtx.Spec.PotentialNodes = *p.potentialNodes + } + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx)) + } else { + logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx)) + } + _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{}) } else { - logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx)) - } - if schedulingCtx.UID == "" { - schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{}) - } else { - // TODO (#113700): patch here to avoid racing with drivers which update the status. - schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{}) + // Create it. + schedulingCtx := &resourcev1alpha2.PodSchedulingContext{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})}, + }, + } + if p.selectedNode != nil { + schedulingCtx.Spec.SelectedNode = *p.selectedNode + } + if p.potentialNodes != nil { + schedulingCtx.Spec.PotentialNodes = *p.potentialNodes + } + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx)) + } else { + logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx)) + } + _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{}) } if err != nil { return err } - d.schedulingCtx = schedulingCtx - d.podSchedulingDirty = false + p.potentialNodes = nil + p.selectedNode = nil return nil } -// storePodSchedulingContext replaces the pod schedulingCtx object in the state. -func (d *stateData) storePodSchedulingContexts(schedulingCtx *resourcev1alpha2.PodSchedulingContext) { - d.mutex.Lock() - defer d.mutex.Unlock() - - d.schedulingCtx = schedulingCtx - d.podSchedulingDirty = true -} - func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus { + if schedulingCtx == nil { + return nil + } for _, status := range schedulingCtx.Status.ResourceClaims { if status.Name == podClaimName { return &status @@ -562,6 +575,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, framework.NewStatus(framework.Skip) } + // Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims. + if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil { + return nil, statusError(logger, err) + } + s.informationsForClaim = make([]informationForClaim, len(claims)) for index, claim := range claims { if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate && @@ -606,11 +624,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl s.informationsForClaim[index].availableOnNode = selector } // Now we need information from drivers. - schedulingCtx, err := s.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return nil, statusError(logger, err) - } - s.informationsForClaim[index].status = statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) + s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) } } @@ -764,64 +778,71 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta if err != nil { return statusError(klog.FromContext(ctx), err) } + defer func() { + state.preScored = true + }() if len(state.claims) == 0 { return nil } logger := klog.FromContext(ctx) - schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return statusError(logger, err) - } pending := false for _, claim := range state.claims { if claim.Status.Allocation == nil { pending = true } } - if pending && !haveAllNodes(schedulingCtx.Spec.PotentialNodes, nodes) { - // Remember the potential nodes. The object will get created or - // updated in Reserve. This is both an optimization and - // covers the case that PreScore doesn't get called when there - // is only a single node. - logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) - schedulingCtx = schedulingCtx.DeepCopy() - numNodes := len(nodes) - if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize { - numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize - } - schedulingCtx.Spec.PotentialNodes = make([]string, 0, numNodes) - if numNodes == len(nodes) { - // Copy all node names. - for _, node := range nodes { - schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, node.Name) - } - } else { - // Select a random subset of the nodes to comply with - // the PotentialNodes length limit. Randomization is - // done for us by Go which iterates over map entries - // randomly. - nodeNames := map[string]struct{}{} - for _, node := range nodes { - nodeNames[node.Name] = struct{}{} - } - for nodeName := range nodeNames { - if len(schedulingCtx.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize { - break - } - schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, nodeName) - } - } - sort.Strings(schedulingCtx.Spec.PotentialNodes) - state.storePodSchedulingContexts(schedulingCtx) + if !pending { + logger.V(5).Info("no pending claims", "pod", klog.KObj(pod)) + return nil } - logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) { + logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + return nil + } + + // Remember the potential nodes. The object will get created or + // updated in Reserve. This is both an optimization and + // covers the case that PreScore doesn't get called when there + // is only a single node. + logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + numNodes := len(nodes) + if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize { + numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize + } + potentialNodes := make([]string, 0, numNodes) + if numNodes == len(nodes) { + // Copy all node names. + for _, node := range nodes { + potentialNodes = append(potentialNodes, node.Name) + } + } else { + // Select a random subset of the nodes to comply with + // the PotentialNodes length limit. Randomization is + // done for us by Go which iterates over map entries + // randomly. + nodeNames := map[string]struct{}{} + for _, node := range nodes { + nodeNames[node.Name] = struct{}{} + } + for nodeName := range nodeNames { + if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize { + break + } + potentialNodes = append(potentialNodes, nodeName) + } + } + sort.Strings(potentialNodes) + state.podSchedulingState.potentialNodes = &potentialNodes return nil } -func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool { +func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*v1.Node) bool { + if schedulingCtx == nil { + return false + } for _, node := range nodes { - if !haveNode(nodeNames, node.Name) { + if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Name) { return false } } @@ -853,10 +874,6 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat numDelayedAllocationPending := 0 numClaimsWithStatusInfo := 0 logger := klog.FromContext(ctx) - schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister) - if err != nil { - return statusError(logger, err) - } for index, claim := range state.claims { if claim.Status.Allocation != nil { // Allocated, but perhaps not reserved yet. @@ -886,7 +903,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat // Did the driver provide information that steered node // selection towards a node that it can support? - if statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { + if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil { numClaimsWithStatusInfo++ } } @@ -897,16 +914,19 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return nil } - podSchedulingDirty := state.podSchedulingDirty - if len(schedulingCtx.Spec.PotentialNodes) == 0 { - // PreScore was not called, probably because there was - // only one candidate. We need to ask whether that - // node is suitable, otherwise the scheduler will pick - // it forever even when it cannot satisfy the claim. - schedulingCtx = schedulingCtx.DeepCopy() - schedulingCtx.Spec.PotentialNodes = []string{nodeName} - logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) - podSchedulingDirty = true + if !state.preScored { + // There was only one candidate that passed the Filters and + // therefore PreScore was not called. + // + // We need to ask whether that node is suitable, otherwise the + // scheduler will pick it forever even when it cannot satisfy + // the claim. + if state.podSchedulingState.schedulingCtx == nil || + !containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) { + potentialNodes := []string{nodeName} + state.podSchedulingState.potentialNodes = &potentialNodes + logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + } } // When there is only one pending resource, we can go ahead with @@ -914,26 +934,26 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat // the driver yet. Otherwise we wait for information before blindly // making a decision that might have to be reversed later. if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending { - schedulingCtx = schedulingCtx.DeepCopy() // TODO: can we increase the chance that the scheduler picks // the same node as before when allocation is on-going, // assuming that that node still fits the pod? Picking a // different node may lead to some claims being allocated for // one node and others for another, which then would have to be // resolved with deallocation. - schedulingCtx.Spec.SelectedNode = nodeName - logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) - if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil { - return statusError(logger, err) + if state.podSchedulingState.schedulingCtx == nil || + state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName { + state.podSchedulingState.selectedNode = &nodeName + logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + return statusError(logger, err) + } + return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) } - return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) } // May have been modified earlier in PreScore or above. - if podSchedulingDirty { - if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil { - return statusError(logger, err) - } + if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil { + return statusError(logger, err) } // More than one pending claim and not enough information about all of them. @@ -946,6 +966,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod)) } +func containsNode(hay []string, needle string) bool { + for _, node := range hay { + if node == needle { + return true + } + } + return false +} + // Unreserve clears the ReservedFor field for all claims. // It's idempotent, and does nothing if no state found for the given pod. func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index a3ebfbcaeba..8d94118df10 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -967,11 +967,12 @@ func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper { func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper { wrapper.OwnerReferences = []metav1.OwnerReference{ { - APIVersion: gvk.GroupVersion().String(), - Kind: gvk.Kind, - Name: name, - UID: types.UID(uid), - Controller: pointer.Bool(true), + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + UID: types.UID(uid), + Controller: pointer.Bool(true), + BlockOwnerDeletion: pointer.Bool(true), }, } return wrapper