dra scheduler: refactor PodSchedulingContext updates

Instead of modifying the PodSchedulingContext and then creating or updating it,
now the required changes (selected node, potential nodes) are tracked and the
actual input for an API call is created if (and only if) needed at the end.

This makes the code easier to read and change. In particular, replacing the
Update call with Patch or Apply is easy.
This commit is contained in:
Patrick Ohly 2023-08-30 09:13:31 +02:00
parent 6eca142082
commit 5c7dac2d77
2 changed files with 178 additions and 148 deletions

View File

@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
@ -54,6 +55,9 @@ const (
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// preScored is true if PreScore was invoked.
preScored bool
// A copy of all claims for the Pod (i.e. 1:1 match with
// pod.Spec.ResourceClaims), initially with the status from the start
// of the scheduling cycle. Each claim instance is read-only because it
@ -72,17 +76,9 @@ type stateData struct {
// protected by the mutex. Used by PostFilter.
unavailableClaims sets.Int
// A pointer to the PodSchedulingContext object for the pod, if one exists.
// Gets set on demand.
//
// Conceptually, this object belongs into the scheduler framework
// where it might get shared by different plugins. But in practice,
// it is currently only used by dynamic provisioning and thus
// managed entirely here.
schedulingCtx *resourcev1alpha2.PodSchedulingContext
// podSchedulingDirty is true if the current copy was locally modified.
podSchedulingDirty bool
// podSchedulingState keeps track of the PodSchedulingContext
// (if one exists) and the changes made to it.
podSchedulingState podSchedulingState
mutex sync.Mutex
@ -123,91 +119,108 @@ func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.
return nil
}
// initializePodSchedulingContext can be called concurrently. It returns an existing PodSchedulingContext
// object if there is one already, retrieves one if not, or as a last resort creates
// one from scratch.
func (d *stateData) initializePodSchedulingContexts(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) (*resourcev1alpha2.PodSchedulingContext, error) {
// TODO (#113701): check if this mutex locking can be avoided by calling initializePodSchedulingContext during PreFilter.
d.mutex.Lock()
defer d.mutex.Unlock()
type podSchedulingState struct {
// A pointer to the PodSchedulingContext object for the pod, if one exists
// in the API server.
//
// Conceptually, this object belongs into the scheduler framework
// where it might get shared by different plugins. But in practice,
// it is currently only used by dynamic provisioning and thus
// managed entirely here.
schedulingCtx *resourcev1alpha2.PodSchedulingContext
if d.schedulingCtx != nil {
return d.schedulingCtx, nil
}
// selectedNode is set if (and only if) a node has been selected.
selectedNode *string
// potentialNodes is set if (and only if) the potential nodes field
// needs to be updated or set.
potentialNodes *[]string
}
func (p *podSchedulingState) isDirty() bool {
return p.selectedNode != nil ||
p.potentialNodes != nil
}
// init checks whether there is already a PodSchedulingContext object.
// Must not be called concurrently,
func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error {
schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
switch {
case apierrors.IsNotFound(err):
controller := true
schedulingCtx = &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &controller,
},
},
},
}
err = nil
return nil
case err != nil:
return nil, err
return err
default:
// We have an object, but it might be obsolete.
if !metav1.IsControlledBy(schedulingCtx, pod) {
return nil, fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
}
}
d.schedulingCtx = schedulingCtx
return schedulingCtx, err
p.schedulingCtx = schedulingCtx
return nil
}
// publishPodSchedulingContext creates or updates the PodSchedulingContext object.
func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset kubernetes.Interface, schedulingCtx *resourcev1alpha2.PodSchedulingContext) error {
d.mutex.Lock()
defer d.mutex.Unlock()
// publish creates or updates the PodSchedulingContext object, if necessary.
// Must not be called concurrently.
func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error {
if !p.isDirty() {
return nil
}
var err error
logger := klog.FromContext(ctx)
msg := "Updating PodSchedulingContext"
if schedulingCtx.UID == "" {
msg = "Creating PodSchedulingContext"
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx))
if p.schedulingCtx != nil {
// Update it.
schedulingCtx := p.schedulingCtx.DeepCopy()
if p.selectedNode != nil {
schedulingCtx.Spec.SelectedNode = *p.selectedNode
}
if p.potentialNodes != nil {
schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
} else {
logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
} else {
logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx))
}
if schedulingCtx.UID == "" {
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
} else {
// TODO (#113700): patch here to avoid racing with drivers which update the status.
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
// Create it.
schedulingCtx := &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})},
},
}
if p.selectedNode != nil {
schedulingCtx.Spec.SelectedNode = *p.selectedNode
}
if p.potentialNodes != nil {
schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
} else {
logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
}
if err != nil {
return err
}
d.schedulingCtx = schedulingCtx
d.podSchedulingDirty = false
p.potentialNodes = nil
p.selectedNode = nil
return nil
}
// storePodSchedulingContext replaces the pod schedulingCtx object in the state.
func (d *stateData) storePodSchedulingContexts(schedulingCtx *resourcev1alpha2.PodSchedulingContext) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.schedulingCtx = schedulingCtx
d.podSchedulingDirty = true
}
func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
if schedulingCtx == nil {
return nil
}
for _, status := range schedulingCtx.Status.ResourceClaims {
if status.Name == podClaimName {
return &status
@ -562,6 +575,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, framework.NewStatus(framework.Skip)
}
// Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims.
if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim = make([]informationForClaim, len(claims))
for index, claim := range claims {
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate &&
@ -606,11 +624,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
s.informationsForClaim[index].availableOnNode = selector
}
// Now we need information from drivers.
schedulingCtx, err := s.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].status = statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name)
s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
}
}
@ -764,64 +778,71 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
defer func() {
state.preScored = true
}()
if len(state.claims) == 0 {
return nil
}
logger := klog.FromContext(ctx)
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return statusError(logger, err)
}
pending := false
for _, claim := range state.claims {
if claim.Status.Allocation == nil {
pending = true
}
}
if pending && !haveAllNodes(schedulingCtx.Spec.PotentialNodes, nodes) {
// Remember the potential nodes. The object will get created or
// updated in Reserve. This is both an optimization and
// covers the case that PreScore doesn't get called when there
// is only a single node.
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
schedulingCtx = schedulingCtx.DeepCopy()
numNodes := len(nodes)
if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
}
schedulingCtx.Spec.PotentialNodes = make([]string, 0, numNodes)
if numNodes == len(nodes) {
// Copy all node names.
for _, node := range nodes {
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, node.Name)
}
} else {
// Select a random subset of the nodes to comply with
// the PotentialNodes length limit. Randomization is
// done for us by Go which iterates over map entries
// randomly.
nodeNames := map[string]struct{}{}
for _, node := range nodes {
nodeNames[node.Name] = struct{}{}
}
for nodeName := range nodeNames {
if len(schedulingCtx.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
break
}
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, nodeName)
}
}
sort.Strings(schedulingCtx.Spec.PotentialNodes)
state.storePodSchedulingContexts(schedulingCtx)
if !pending {
logger.V(5).Info("no pending claims", "pod", klog.KObj(pod))
return nil
}
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) {
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
return nil
}
// Remember the potential nodes. The object will get created or
// updated in Reserve. This is both an optimization and
// covers the case that PreScore doesn't get called when there
// is only a single node.
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
numNodes := len(nodes)
if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
}
potentialNodes := make([]string, 0, numNodes)
if numNodes == len(nodes) {
// Copy all node names.
for _, node := range nodes {
potentialNodes = append(potentialNodes, node.Name)
}
} else {
// Select a random subset of the nodes to comply with
// the PotentialNodes length limit. Randomization is
// done for us by Go which iterates over map entries
// randomly.
nodeNames := map[string]struct{}{}
for _, node := range nodes {
nodeNames[node.Name] = struct{}{}
}
for nodeName := range nodeNames {
if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
break
}
potentialNodes = append(potentialNodes, nodeName)
}
}
sort.Strings(potentialNodes)
state.podSchedulingState.potentialNodes = &potentialNodes
return nil
}
func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool {
func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*v1.Node) bool {
if schedulingCtx == nil {
return false
}
for _, node := range nodes {
if !haveNode(nodeNames, node.Name) {
if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Name) {
return false
}
}
@ -853,10 +874,6 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
numDelayedAllocationPending := 0
numClaimsWithStatusInfo := 0
logger := klog.FromContext(ctx)
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return statusError(logger, err)
}
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
// Allocated, but perhaps not reserved yet.
@ -886,7 +903,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
// Did the driver provide information that steered node
// selection towards a node that it can support?
if statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
numClaimsWithStatusInfo++
}
}
@ -897,16 +914,19 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return nil
}
podSchedulingDirty := state.podSchedulingDirty
if len(schedulingCtx.Spec.PotentialNodes) == 0 {
// PreScore was not called, probably because there was
// only one candidate. We need to ask whether that
// node is suitable, otherwise the scheduler will pick
// it forever even when it cannot satisfy the claim.
schedulingCtx = schedulingCtx.DeepCopy()
schedulingCtx.Spec.PotentialNodes = []string{nodeName}
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
podSchedulingDirty = true
if !state.preScored {
// There was only one candidate that passed the Filters and
// therefore PreScore was not called.
//
// We need to ask whether that node is suitable, otherwise the
// scheduler will pick it forever even when it cannot satisfy
// the claim.
if state.podSchedulingState.schedulingCtx == nil ||
!containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) {
potentialNodes := []string{nodeName}
state.podSchedulingState.potentialNodes = &potentialNodes
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
}
// When there is only one pending resource, we can go ahead with
@ -914,26 +934,26 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
// the driver yet. Otherwise we wait for information before blindly
// making a decision that might have to be reversed later.
if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending {
schedulingCtx = schedulingCtx.DeepCopy()
// TODO: can we increase the chance that the scheduler picks
// the same node as before when allocation is on-going,
// assuming that that node still fits the pod? Picking a
// different node may lead to some claims being allocated for
// one node and others for another, which then would have to be
// resolved with deallocation.
schedulingCtx.Spec.SelectedNode = nodeName
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
return statusError(logger, err)
if state.podSchedulingState.schedulingCtx == nil ||
state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName {
state.podSchedulingState.selectedNode = &nodeName
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
return statusError(logger, err)
}
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
// May have been modified earlier in PreScore or above.
if podSchedulingDirty {
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
return statusError(logger, err)
}
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
return statusError(logger, err)
}
// More than one pending claim and not enough information about all of them.
@ -946,6 +966,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod))
}
func containsNode(hay []string, needle string) bool {
for _, node := range hay {
if node == needle {
return true
}
}
return false
}
// Unreserve clears the ReservedFor field for all claims.
// It's idempotent, and does nothing if no state found for the given pod.
func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {

View File

@ -967,11 +967,12 @@ func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper {
func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper {
wrapper.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
BlockOwnerDeletion: pointer.Bool(true),
},
}
return wrapper