Merge pull request #120253 from pohly/dra-scheduler-podschedulingcontext-updates

dra scheduler: refactor PodSchedulingContext updates
This commit is contained in:
Kubernetes Prow Robot 2023-09-08 02:48:14 -07:00 committed by GitHub
commit a64a3e16ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 148 deletions

View File

@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
@ -54,6 +55,9 @@ const (
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// preScored is true if PreScore was invoked.
preScored bool
// A copy of all claims for the Pod (i.e. 1:1 match with
// pod.Spec.ResourceClaims), initially with the status from the start
// of the scheduling cycle. Each claim instance is read-only because it
@ -72,17 +76,9 @@ type stateData struct {
// protected by the mutex. Used by PostFilter.
unavailableClaims sets.Int
// A pointer to the PodSchedulingContext object for the pod, if one exists.
// Gets set on demand.
//
// Conceptually, this object belongs into the scheduler framework
// where it might get shared by different plugins. But in practice,
// it is currently only used by dynamic provisioning and thus
// managed entirely here.
schedulingCtx *resourcev1alpha2.PodSchedulingContext
// podSchedulingDirty is true if the current copy was locally modified.
podSchedulingDirty bool
// podSchedulingState keeps track of the PodSchedulingContext
// (if one exists) and the changes made to it.
podSchedulingState podSchedulingState
mutex sync.Mutex
@ -123,91 +119,108 @@ func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.
return nil
}
// initializePodSchedulingContext can be called concurrently. It returns an existing PodSchedulingContext
// object if there is one already, retrieves one if not, or as a last resort creates
// one from scratch.
func (d *stateData) initializePodSchedulingContexts(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) (*resourcev1alpha2.PodSchedulingContext, error) {
// TODO (#113701): check if this mutex locking can be avoided by calling initializePodSchedulingContext during PreFilter.
d.mutex.Lock()
defer d.mutex.Unlock()
type podSchedulingState struct {
// A pointer to the PodSchedulingContext object for the pod, if one exists
// in the API server.
//
// Conceptually, this object belongs into the scheduler framework
// where it might get shared by different plugins. But in practice,
// it is currently only used by dynamic provisioning and thus
// managed entirely here.
schedulingCtx *resourcev1alpha2.PodSchedulingContext
if d.schedulingCtx != nil {
return d.schedulingCtx, nil
}
// selectedNode is set if (and only if) a node has been selected.
selectedNode *string
// potentialNodes is set if (and only if) the potential nodes field
// needs to be updated or set.
potentialNodes *[]string
}
func (p *podSchedulingState) isDirty() bool {
return p.selectedNode != nil ||
p.potentialNodes != nil
}
// init checks whether there is already a PodSchedulingContext object.
// Must not be called concurrently,
func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error {
schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
switch {
case apierrors.IsNotFound(err):
controller := true
schedulingCtx = &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &controller,
},
},
},
}
err = nil
return nil
case err != nil:
return nil, err
return err
default:
// We have an object, but it might be obsolete.
if !metav1.IsControlledBy(schedulingCtx, pod) {
return nil, fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
}
}
d.schedulingCtx = schedulingCtx
return schedulingCtx, err
p.schedulingCtx = schedulingCtx
return nil
}
// publishPodSchedulingContext creates or updates the PodSchedulingContext object.
func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset kubernetes.Interface, schedulingCtx *resourcev1alpha2.PodSchedulingContext) error {
d.mutex.Lock()
defer d.mutex.Unlock()
// publish creates or updates the PodSchedulingContext object, if necessary.
// Must not be called concurrently.
func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error {
if !p.isDirty() {
return nil
}
var err error
logger := klog.FromContext(ctx)
msg := "Updating PodSchedulingContext"
if schedulingCtx.UID == "" {
msg = "Creating PodSchedulingContext"
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx))
if p.schedulingCtx != nil {
// Update it.
schedulingCtx := p.schedulingCtx.DeepCopy()
if p.selectedNode != nil {
schedulingCtx.Spec.SelectedNode = *p.selectedNode
}
if p.potentialNodes != nil {
schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
} else {
logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
} else {
logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx))
}
if schedulingCtx.UID == "" {
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
} else {
// TODO (#113700): patch here to avoid racing with drivers which update the status.
schedulingCtx, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
// Create it.
schedulingCtx := &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})},
},
}
if p.selectedNode != nil {
schedulingCtx.Spec.SelectedNode = *p.selectedNode
}
if p.potentialNodes != nil {
schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
} else {
logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
}
_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
}
if err != nil {
return err
}
d.schedulingCtx = schedulingCtx
d.podSchedulingDirty = false
p.potentialNodes = nil
p.selectedNode = nil
return nil
}
// storePodSchedulingContext replaces the pod schedulingCtx object in the state.
func (d *stateData) storePodSchedulingContexts(schedulingCtx *resourcev1alpha2.PodSchedulingContext) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.schedulingCtx = schedulingCtx
d.podSchedulingDirty = true
}
func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
if schedulingCtx == nil {
return nil
}
for _, status := range schedulingCtx.Status.ResourceClaims {
if status.Name == podClaimName {
return &status
@ -564,6 +577,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, framework.NewStatus(framework.Skip)
}
// Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims.
if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim = make([]informationForClaim, len(claims))
for index, claim := range claims {
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate &&
@ -614,11 +632,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
s.informationsForClaim[index].availableOnNode = selector
}
// Now we need information from drivers.
schedulingCtx, err := s.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].status = statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name)
s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
}
}
@ -772,64 +786,71 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
defer func() {
state.preScored = true
}()
if len(state.claims) == 0 {
return nil
}
logger := klog.FromContext(ctx)
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return statusError(logger, err)
}
pending := false
for _, claim := range state.claims {
if claim.Status.Allocation == nil {
pending = true
}
}
if pending && !haveAllNodes(schedulingCtx.Spec.PotentialNodes, nodes) {
// Remember the potential nodes. The object will get created or
// updated in Reserve. This is both an optimization and
// covers the case that PreScore doesn't get called when there
// is only a single node.
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
schedulingCtx = schedulingCtx.DeepCopy()
numNodes := len(nodes)
if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
}
schedulingCtx.Spec.PotentialNodes = make([]string, 0, numNodes)
if numNodes == len(nodes) {
// Copy all node names.
for _, node := range nodes {
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, node.Name)
}
} else {
// Select a random subset of the nodes to comply with
// the PotentialNodes length limit. Randomization is
// done for us by Go which iterates over map entries
// randomly.
nodeNames := map[string]struct{}{}
for _, node := range nodes {
nodeNames[node.Name] = struct{}{}
}
for nodeName := range nodeNames {
if len(schedulingCtx.Spec.PotentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
break
}
schedulingCtx.Spec.PotentialNodes = append(schedulingCtx.Spec.PotentialNodes, nodeName)
}
}
sort.Strings(schedulingCtx.Spec.PotentialNodes)
state.storePodSchedulingContexts(schedulingCtx)
if !pending {
logger.V(5).Info("no pending claims", "pod", klog.KObj(pod))
return nil
}
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) {
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
return nil
}
// Remember the potential nodes. The object will get created or
// updated in Reserve. This is both an optimization and
// covers the case that PreScore doesn't get called when there
// is only a single node.
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
numNodes := len(nodes)
if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
}
potentialNodes := make([]string, 0, numNodes)
if numNodes == len(nodes) {
// Copy all node names.
for _, node := range nodes {
potentialNodes = append(potentialNodes, node.Name)
}
} else {
// Select a random subset of the nodes to comply with
// the PotentialNodes length limit. Randomization is
// done for us by Go which iterates over map entries
// randomly.
nodeNames := map[string]struct{}{}
for _, node := range nodes {
nodeNames[node.Name] = struct{}{}
}
for nodeName := range nodeNames {
if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
break
}
potentialNodes = append(potentialNodes, nodeName)
}
}
sort.Strings(potentialNodes)
state.podSchedulingState.potentialNodes = &potentialNodes
return nil
}
func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool {
func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*v1.Node) bool {
if schedulingCtx == nil {
return false
}
for _, node := range nodes {
if !haveNode(nodeNames, node.Name) {
if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Name) {
return false
}
}
@ -861,10 +882,6 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
numDelayedAllocationPending := 0
numClaimsWithStatusInfo := 0
logger := klog.FromContext(ctx)
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return statusError(logger, err)
}
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
// Allocated, but perhaps not reserved yet.
@ -894,7 +911,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
// Did the driver provide information that steered node
// selection towards a node that it can support?
if statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
numClaimsWithStatusInfo++
}
}
@ -905,16 +922,19 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return nil
}
podSchedulingDirty := state.podSchedulingDirty
if len(schedulingCtx.Spec.PotentialNodes) == 0 {
// PreScore was not called, probably because there was
// only one candidate. We need to ask whether that
// node is suitable, otherwise the scheduler will pick
// it forever even when it cannot satisfy the claim.
schedulingCtx = schedulingCtx.DeepCopy()
schedulingCtx.Spec.PotentialNodes = []string{nodeName}
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
podSchedulingDirty = true
if !state.preScored {
// There was only one candidate that passed the Filters and
// therefore PreScore was not called.
//
// We need to ask whether that node is suitable, otherwise the
// scheduler will pick it forever even when it cannot satisfy
// the claim.
if state.podSchedulingState.schedulingCtx == nil ||
!containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) {
potentialNodes := []string{nodeName}
state.podSchedulingState.potentialNodes = &potentialNodes
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
}
// When there is only one pending resource, we can go ahead with
@ -922,26 +942,26 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
// the driver yet. Otherwise we wait for information before blindly
// making a decision that might have to be reversed later.
if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending {
schedulingCtx = schedulingCtx.DeepCopy()
// TODO: can we increase the chance that the scheduler picks
// the same node as before when allocation is on-going,
// assuming that that node still fits the pod? Picking a
// different node may lead to some claims being allocated for
// one node and others for another, which then would have to be
// resolved with deallocation.
schedulingCtx.Spec.SelectedNode = nodeName
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
return statusError(logger, err)
if state.podSchedulingState.schedulingCtx == nil ||
state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName {
state.podSchedulingState.selectedNode = &nodeName
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
return statusError(logger, err)
}
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
// May have been modified earlier in PreScore or above.
if podSchedulingDirty {
if err := state.publishPodSchedulingContexts(ctx, pl.clientset, schedulingCtx); err != nil {
return statusError(logger, err)
}
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
return statusError(logger, err)
}
// More than one pending claim and not enough information about all of them.
@ -954,6 +974,15 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod))
}
func containsNode(hay []string, needle string) bool {
for _, node := range hay {
if node == needle {
return true
}
}
return false
}
// Unreserve clears the ReservedFor field for all claims.
// It's idempotent, and does nothing if no state found for the given pod.
func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {

View File

@ -967,11 +967,12 @@ func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper {
func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper {
wrapper.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
BlockOwnerDeletion: pointer.Bool(true),
},
}
return wrapper