mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
Merge pull request #119078 from pohly/dra-scheduler-queueing-hints
dra: scheduler queueing hints
This commit is contained in:
commit
529eeb78ef
@ -23,8 +23,11 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||||
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -38,6 +41,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||||
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -169,7 +173,7 @@ func (d *stateData) publishPodSchedulingContexts(ctx context.Context, clientset
|
|||||||
}
|
}
|
||||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||||
// At a high enough log level, dump the entire object.
|
// At a high enough log level, dump the entire object.
|
||||||
loggerV.Info(msg, "podSchedulingCtxDump", schedulingCtx)
|
loggerV.Info(msg, "podSchedulingCtxDump", klog.Format(schedulingCtx))
|
||||||
} else {
|
} else {
|
||||||
logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx))
|
logger.V(5).Info(msg, "podSchedulingCtx", klog.KObj(schedulingCtx))
|
||||||
}
|
}
|
||||||
@ -208,14 +212,25 @@ func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podCla
|
|||||||
// dynamicResources is a plugin that ensures that ResourceClaims are allocated.
|
// dynamicResources is a plugin that ensures that ResourceClaims are allocated.
|
||||||
type dynamicResources struct {
|
type dynamicResources struct {
|
||||||
enabled bool
|
enabled bool
|
||||||
|
fh framework.Handle
|
||||||
clientset kubernetes.Interface
|
clientset kubernetes.Interface
|
||||||
claimLister resourcev1alpha2listers.ResourceClaimLister
|
claimLister resourcev1alpha2listers.ResourceClaimLister
|
||||||
classLister resourcev1alpha2listers.ResourceClassLister
|
classLister resourcev1alpha2listers.ResourceClassLister
|
||||||
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
|
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
|
||||||
|
|
||||||
|
// logger is only meant to be used by background activities which don't
|
||||||
|
// have some other logger in their parent callstack.
|
||||||
|
logger klog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// New initializes a new plugin and returns it.
|
// New initializes a new plugin and returns it.
|
||||||
func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||||
|
// TODO: the runtime should set up logging for each plugin, including
|
||||||
|
// adding a name for each one (same as in kube-controller-manager).
|
||||||
|
return NewWithLogger(klog.TODO(), plArgs, fh, fts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWithLogger(logger klog.Logger, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||||
if !fts.EnableDynamicResourceAllocation {
|
if !fts.EnableDynamicResourceAllocation {
|
||||||
// Disabled, won't do anything.
|
// Disabled, won't do anything.
|
||||||
return &dynamicResources{}, nil
|
return &dynamicResources{}, nil
|
||||||
@ -223,13 +238,16 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram
|
|||||||
|
|
||||||
return &dynamicResources{
|
return &dynamicResources{
|
||||||
enabled: true,
|
enabled: true,
|
||||||
|
fh: fh,
|
||||||
clientset: fh.ClientSet(),
|
clientset: fh.ClientSet(),
|
||||||
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
|
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
|
||||||
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
|
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
|
||||||
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
|
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
|
||||||
|
logger: logger,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ framework.PreEnqueuePlugin = &dynamicResources{}
|
||||||
var _ framework.PreFilterPlugin = &dynamicResources{}
|
var _ framework.PreFilterPlugin = &dynamicResources{}
|
||||||
var _ framework.FilterPlugin = &dynamicResources{}
|
var _ framework.FilterPlugin = &dynamicResources{}
|
||||||
var _ framework.PostFilterPlugin = &dynamicResources{}
|
var _ framework.PostFilterPlugin = &dynamicResources{}
|
||||||
@ -251,12 +269,10 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
|
|||||||
}
|
}
|
||||||
events := []framework.ClusterEventWithHint{
|
events := []framework.ClusterEventWithHint{
|
||||||
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
|
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
|
||||||
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}},
|
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
|
||||||
// When a driver has provided additional information, a pod waiting for that information
|
// When a driver has provided additional information, a pod waiting for that information
|
||||||
// may be schedulable.
|
// may be schedulable.
|
||||||
// TODO (#113702): can we change this so that such an event does not trigger *all* pods?
|
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
|
||||||
// Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70
|
|
||||||
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}},
|
|
||||||
// A resource might depend on node labels for topology filtering.
|
// A resource might depend on node labels for topology filtering.
|
||||||
// A new or updated node may make pods schedulable.
|
// A new or updated node may make pods schedulable.
|
||||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
|
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
|
||||||
@ -264,13 +280,237 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
|
|||||||
return events
|
return events
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PreEnqueue checks if there are known reasons why a pod currently cannot be
|
||||||
|
// scheduled. When this fails, one of the registered events can trigger another
|
||||||
|
// attempt.
|
||||||
|
func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
|
||||||
|
if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
|
||||||
|
return statusUnschedulable(klog.FromContext(ctx), err.Error())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isSchedulableAfterClaimChange is invoked for all claim events reported by
|
||||||
|
// an informer. It checks whether that change made a previously unschedulable
|
||||||
|
// pod schedulable. It errs on the side of letting a pod scheduling attempt
|
||||||
|
// happen.
|
||||||
|
func (pl *dynamicResources) isSchedulableAfterClaimChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
|
||||||
|
if newObj == nil {
|
||||||
|
// Deletes don't make a pod schedulable.
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
_, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](nil, newObj)
|
||||||
|
if err != nil {
|
||||||
|
// Shouldn't happen.
|
||||||
|
pl.logger.Error(err, "unexpected new object in isSchedulableAfterClaimChange")
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
}
|
||||||
|
|
||||||
|
usesClaim := false
|
||||||
|
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
|
||||||
|
if claim.UID == modifiedClaim.UID {
|
||||||
|
usesClaim = true
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
// This is not an unexpected error: we know that
|
||||||
|
// foreachPodResourceClaim only returns errors for "not
|
||||||
|
// schedulable".
|
||||||
|
pl.logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
if !usesClaim {
|
||||||
|
// This was not the claim the pod was waiting for.
|
||||||
|
pl.logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldObj == nil {
|
||||||
|
pl.logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||||
|
return framework.QueueImmediately
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modifications may or may not be relevant. If the entire
|
||||||
|
// status is as before, then something else must have changed
|
||||||
|
// and we don't care. What happens in practice is that the
|
||||||
|
// resource driver adds the finalizer.
|
||||||
|
originalClaim, ok := oldObj.(*resourcev1alpha2.ResourceClaim)
|
||||||
|
if !ok {
|
||||||
|
// Shouldn't happen.
|
||||||
|
pl.logger.Error(nil, "unexpected old object in isSchedulableAfterClaimAddOrUpdate", "obj", oldObj)
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
}
|
||||||
|
if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
|
||||||
|
if loggerV := pl.logger.V(7); loggerV.Enabled() {
|
||||||
|
// Log more information.
|
||||||
|
loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim))
|
||||||
|
} else {
|
||||||
|
pl.logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||||
|
}
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
pl.logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
|
||||||
|
return framework.QueueImmediately
|
||||||
|
}
|
||||||
|
|
||||||
|
// isSchedulableAfterPodSchedulingContextChange is invoked for all
|
||||||
|
// PodSchedulingContext events reported by an informer. It checks whether that
|
||||||
|
// change made a previously unschedulable pod schedulable (updated) or a new
|
||||||
|
// attempt is needed to re-create the object (deleted). It errs on the side of
|
||||||
|
// letting a pod scheduling attempt happen.
|
||||||
|
func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
|
||||||
|
// Deleted? That can happen because we ourselves delete the PodSchedulingContext while
|
||||||
|
// working on the pod. This can be ignored.
|
||||||
|
if oldObj != nil && newObj == nil {
|
||||||
|
pl.logger.V(4).Info("PodSchedulingContext got deleted")
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj)
|
||||||
|
if err != nil {
|
||||||
|
// Shouldn't happen.
|
||||||
|
pl.logger.Error(nil, "isSchedulableAfterPodSchedulingChange")
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
}
|
||||||
|
podScheduling := newPodScheduling // Never nil because deletes are handled above.
|
||||||
|
|
||||||
|
if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace {
|
||||||
|
pl.logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the drivers have provided information about all
|
||||||
|
// unallocated claims with delayed allocation, then the next
|
||||||
|
// scheduling attempt is able to pick a node, so we let it run
|
||||||
|
// immediately if this occurred for the first time, otherwise
|
||||||
|
// we allow backoff.
|
||||||
|
pendingDelayedClaims := 0
|
||||||
|
if err := pl.foreachPodResourceClaim(pod, func(podResourceName string, claim *resourcev1alpha2.ResourceClaim) {
|
||||||
|
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
|
||||||
|
claim.Status.Allocation == nil &&
|
||||||
|
!podSchedulingHasClaimInfo(podScheduling, podResourceName) {
|
||||||
|
pendingDelayedClaims++
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
// This is not an unexpected error: we know that
|
||||||
|
// foreachPodResourceClaim only returns errors for "not
|
||||||
|
// schedulable".
|
||||||
|
pl.logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some driver responses missing?
|
||||||
|
if pendingDelayedClaims > 0 {
|
||||||
|
// We could start a pod scheduling attempt to refresh the
|
||||||
|
// potential nodes list. But pod scheduling attempts are
|
||||||
|
// expensive and doing them too often causes the pod to enter
|
||||||
|
// backoff. Let's wait instead for all drivers to reply.
|
||||||
|
if loggerV := pl.logger.V(6); loggerV.Enabled() {
|
||||||
|
loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
|
||||||
|
} else {
|
||||||
|
pl.logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
|
||||||
|
}
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldPodScheduling == nil /* create */ ||
|
||||||
|
len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
|
||||||
|
// This definitely is new information for the scheduler. Try again immediately.
|
||||||
|
pl.logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
|
||||||
|
return framework.QueueImmediately
|
||||||
|
}
|
||||||
|
|
||||||
|
// The other situation where the scheduler needs to do
|
||||||
|
// something immediately is when the selected node doesn't
|
||||||
|
// work: waiting in the backoff queue only helps eventually
|
||||||
|
// resources on the selected node become available again. It's
|
||||||
|
// much more likely, in particular when trying to fill up the
|
||||||
|
// cluster, that the choice simply didn't work out. The risk
|
||||||
|
// here is that in a situation where the cluster really is
|
||||||
|
// full, backoff won't be used because the scheduler keeps
|
||||||
|
// trying different nodes. This should not happen when it has
|
||||||
|
// full knowledge about resource availability (=
|
||||||
|
// PodSchedulingContext.*.UnsuitableNodes is complete) but may happen
|
||||||
|
// when it doesn't (= PodSchedulingContext.*.UnsuitableNodes had to be
|
||||||
|
// truncated).
|
||||||
|
//
|
||||||
|
// Truncation only happens for very large clusters and then may slow
|
||||||
|
// down scheduling, but should not break it completely. This is
|
||||||
|
// acceptable while DRA is alpha and will be investigated further
|
||||||
|
// before moving DRA to beta.
|
||||||
|
if podScheduling.Spec.SelectedNode != "" {
|
||||||
|
for _, claimStatus := range podScheduling.Status.ResourceClaims {
|
||||||
|
if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) {
|
||||||
|
pl.logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
|
||||||
|
return framework.QueueImmediately
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update with only the spec modified?
|
||||||
|
if oldPodScheduling != nil &&
|
||||||
|
!apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) &&
|
||||||
|
apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) {
|
||||||
|
pl.logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
|
||||||
|
return framework.QueueSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once we get here, all changes which are known to require special responses
|
||||||
|
// have been checked for. Whatever the change was, we don't know exactly how
|
||||||
|
// to handle it and thus return QueueAfterBackoff. This will cause the
|
||||||
|
// scheduler to treat the event as if no event hint callback had been provided.
|
||||||
|
// Developers who want to investigate this can enable a diff at log level 6.
|
||||||
|
if loggerV := pl.logger.V(6); loggerV.Enabled() {
|
||||||
|
loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
|
||||||
|
} else {
|
||||||
|
pl.logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
|
||||||
|
}
|
||||||
|
return framework.QueueAfterBackoff
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func podSchedulingHasClaimInfo(podScheduling *resourcev1alpha2.PodSchedulingContext, podResourceName string) bool {
|
||||||
|
for _, claimStatus := range podScheduling.Status.ResourceClaims {
|
||||||
|
if claimStatus.Name == podResourceName {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func sliceContains(hay []string, needle string) bool {
|
||||||
|
for _, item := range hay {
|
||||||
|
if item == needle {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims.
|
// podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims.
|
||||||
func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) {
|
func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) {
|
||||||
claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
|
claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
|
||||||
|
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
|
||||||
|
// We store the pointer as returned by the lister. The
|
||||||
|
// assumption is that if a claim gets modified while our code
|
||||||
|
// runs, the cache will store a new pointer, not mutate the
|
||||||
|
// existing object that we point to here.
|
||||||
|
claims = append(claims, claim)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return claims, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// foreachPodResourceClaim checks that each ResourceClaim for the pod exists.
|
||||||
|
// It calls an optional handler for those claims that it finds.
|
||||||
|
func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error {
|
||||||
for _, resource := range pod.Spec.ResourceClaims {
|
for _, resource := range pod.Spec.ResourceClaims {
|
||||||
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource)
|
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
// The claim name might be nil if no underlying resource claim
|
// The claim name might be nil if no underlying resource claim
|
||||||
// was generated for the referenced claim. There are valid use
|
// was generated for the referenced claim. There are valid use
|
||||||
@ -280,25 +520,23 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.
|
|||||||
}
|
}
|
||||||
claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
|
claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if claim.DeletionTimestamp != nil {
|
if claim.DeletionTimestamp != nil {
|
||||||
return nil, fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
|
return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mustCheckOwner {
|
if mustCheckOwner {
|
||||||
if err := resourceclaim.IsForPod(pod, claim); err != nil {
|
if err := resourceclaim.IsForPod(pod, claim); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We store the pointer as returned by the lister. The
|
if cb != nil {
|
||||||
// assumption is that if a claim gets modified while our code
|
cb(resource.Name, claim)
|
||||||
// runs, the cache will store a new pointer, not mutate the
|
|
||||||
// existing object that we point to here.
|
|
||||||
claims = append(claims, claim)
|
|
||||||
}
|
}
|
||||||
return claims, nil
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreFilter invoked at the prefilter extension point to check if pod has all
|
// PreFilter invoked at the prefilter extension point to check if pod has all
|
||||||
@ -577,7 +815,7 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
|
|||||||
sort.Strings(schedulingCtx.Spec.PotentialNodes)
|
sort.Strings(schedulingCtx.Spec.PotentialNodes)
|
||||||
state.storePodSchedulingContexts(schedulingCtx)
|
state.storePodSchedulingContexts(schedulingCtx)
|
||||||
}
|
}
|
||||||
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", nodes)
|
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -37,6 +38,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
cgotesting "k8s.io/client-go/testing"
|
cgotesting "k8s.io/client-go/testing"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/klog/v2/ktesting"
|
"k8s.io/klog/v2/ktesting"
|
||||||
_ "k8s.io/klog/v2/ktesting/init"
|
_ "k8s.io/klog/v2/ktesting/init"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
@ -103,6 +105,7 @@ var (
|
|||||||
AllocationMode(resourcev1alpha2.AllocationModeImmediate).
|
AllocationMode(resourcev1alpha2.AllocationModeImmediate).
|
||||||
Obj()
|
Obj()
|
||||||
pendingDelayedClaim = st.FromResourceClaim(claim).
|
pendingDelayedClaim = st.FromResourceClaim(claim).
|
||||||
|
OwnerReference(podName, podUID, podKind).
|
||||||
AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer).
|
AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer).
|
||||||
Obj()
|
Obj()
|
||||||
pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim).
|
pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim).
|
||||||
@ -117,7 +120,6 @@ var (
|
|||||||
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
|
ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
|
||||||
Obj()
|
Obj()
|
||||||
allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
|
allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
|
||||||
OwnerReference(podName, podUID, podKind).
|
|
||||||
Allocation(&resourcev1alpha2.AllocationResult{}).
|
Allocation(&resourcev1alpha2.AllocationResult{}).
|
||||||
Obj()
|
Obj()
|
||||||
allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
|
allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
|
||||||
@ -183,6 +185,7 @@ func (p perNodeResult) forNode(nodeName string) result {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type want struct {
|
type want struct {
|
||||||
|
preenqueue result
|
||||||
preFilterResult *framework.PreFilterResult
|
preFilterResult *framework.PreFilterResult
|
||||||
prefilter result
|
prefilter result
|
||||||
filter perNodeResult
|
filter perNodeResult
|
||||||
@ -268,11 +271,34 @@ func TestPlugin(t *testing.T) {
|
|||||||
pod: podWithClaimTemplate, // status not set
|
pod: podWithClaimTemplate, // status not set
|
||||||
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
|
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
|
||||||
want: want{
|
want: want{
|
||||||
prefilter: result{
|
preenqueue: result{
|
||||||
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pod "default/my-pod": ResourceClaim not created yet`),
|
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pod "default/my-pod": ResourceClaim not created yet`),
|
||||||
},
|
},
|
||||||
postfilter: result{
|
},
|
||||||
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
|
},
|
||||||
|
"deleted-claim": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := allocatedClaim.DeepCopy()
|
||||||
|
claim.DeletionTimestamp = &metav1.Time{Time: time.Now()}
|
||||||
|
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||||
|
}(),
|
||||||
|
want: want{
|
||||||
|
preenqueue: result{
|
||||||
|
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim "my-pod-my-resource" is being deleted`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"wrong-claim": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: func() []*resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := allocatedClaim.DeepCopy()
|
||||||
|
claim.OwnerReferences[0].UID += "123"
|
||||||
|
return []*resourcev1alpha2.ResourceClaim{claim}
|
||||||
|
}(),
|
||||||
|
want: want{
|
||||||
|
preenqueue: result{
|
||||||
|
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `ResourceClaim default/my-pod-my-resource was not created for pod default/my-pod (pod is not owner)`),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -506,8 +532,16 @@ func TestPlugin(t *testing.T) {
|
|||||||
}
|
}
|
||||||
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings)
|
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings)
|
||||||
testCtx.p.enabled = !tc.disable
|
testCtx.p.enabled = !tc.disable
|
||||||
|
|
||||||
initialObjects := testCtx.listAll(t)
|
initialObjects := testCtx.listAll(t)
|
||||||
|
|
||||||
|
status := testCtx.p.PreEnqueue(testCtx.ctx, tc.pod)
|
||||||
|
t.Run("PreEnqueue", func(t *testing.T) {
|
||||||
|
testCtx.verify(t, tc.want.preenqueue, initialObjects, nil, status)
|
||||||
|
})
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod)
|
result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod)
|
||||||
t.Run("prefilter", func(t *testing.T) {
|
t.Run("prefilter", func(t *testing.T) {
|
||||||
assert.Equal(t, tc.want.preFilterResult, result)
|
assert.Equal(t, tc.want.preFilterResult, result)
|
||||||
@ -597,6 +631,7 @@ func TestPlugin(t *testing.T) {
|
|||||||
type testContext struct {
|
type testContext struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
client *fake.Clientset
|
client *fake.Clientset
|
||||||
|
informerFactory informers.SharedInformerFactory
|
||||||
p *dynamicResources
|
p *dynamicResources
|
||||||
nodeInfos []*framework.NodeInfo
|
nodeInfos []*framework.NodeInfo
|
||||||
state *framework.CycleState
|
state *framework.CycleState
|
||||||
@ -727,7 +762,7 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
tc := &testContext{}
|
tc := &testContext{}
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
tc.ctx = ctx
|
tc.ctx = ctx
|
||||||
@ -736,18 +771,18 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
|
|||||||
reactor := createReactor(tc.client.Tracker())
|
reactor := createReactor(tc.client.Tracker())
|
||||||
tc.client.PrependReactor("*", "*", reactor)
|
tc.client.PrependReactor("*", "*", reactor)
|
||||||
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(tc.client, 0)
|
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
|
||||||
|
|
||||||
opts := []runtime.Option{
|
opts := []runtime.Option{
|
||||||
runtime.WithClientSet(tc.client),
|
runtime.WithClientSet(tc.client),
|
||||||
runtime.WithInformerFactory(informerFactory),
|
runtime.WithInformerFactory(tc.informerFactory),
|
||||||
}
|
}
|
||||||
fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
|
fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
|
pl, err := NewWithLogger(logger, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -768,15 +803,15 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
|
|||||||
require.NoError(t, err, "create pod scheduling")
|
require.NoError(t, err, "create pod scheduling")
|
||||||
}
|
}
|
||||||
|
|
||||||
informerFactory.Start(tc.ctx.Done())
|
tc.informerFactory.Start(tc.ctx.Done())
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
// Need to cancel before waiting for the shutdown.
|
// Need to cancel before waiting for the shutdown.
|
||||||
cancel()
|
cancel()
|
||||||
// Now we can wait for all goroutines to stop.
|
// Now we can wait for all goroutines to stop.
|
||||||
informerFactory.Shutdown()
|
tc.informerFactory.Shutdown()
|
||||||
})
|
})
|
||||||
|
|
||||||
informerFactory.WaitForCacheSync(tc.ctx.Done())
|
tc.informerFactory.WaitForCacheSync(tc.ctx.Done())
|
||||||
|
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
nodeInfo := framework.NewNodeInfo()
|
nodeInfo := framework.NewNodeInfo()
|
||||||
@ -849,3 +884,215 @@ func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Acti
|
|||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClaimChange(t *testing.T) {
|
||||||
|
testcases := map[string]struct {
|
||||||
|
pod *v1.Pod
|
||||||
|
claims []*resourcev1alpha2.ResourceClaim
|
||||||
|
oldObj, newObj interface{}
|
||||||
|
expectedHint framework.QueueingHint
|
||||||
|
}{
|
||||||
|
"skip-deletes": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: allocatedClaim,
|
||||||
|
newObj: nil,
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"backoff-wrong-new-object": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
newObj: "not-a-claim",
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
"skip-wrong-claim": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
newObj: func() *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := allocatedClaim.DeepCopy()
|
||||||
|
claim.OwnerReferences[0].UID += "123"
|
||||||
|
return claim
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"skip-unrelated-claim": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim},
|
||||||
|
newObj: func() *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := allocatedClaim.DeepCopy()
|
||||||
|
claim.Name += "-foo"
|
||||||
|
claim.UID += "123"
|
||||||
|
return claim
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"queue-on-add": {
|
||||||
|
pod: podWithClaimName,
|
||||||
|
newObj: pendingImmediateClaim,
|
||||||
|
expectedHint: framework.QueueImmediately,
|
||||||
|
},
|
||||||
|
"backoff-wrong-old-object": {
|
||||||
|
pod: podWithClaimName,
|
||||||
|
oldObj: "not-a-claim",
|
||||||
|
newObj: pendingImmediateClaim,
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
"skip-adding-finalizer": {
|
||||||
|
pod: podWithClaimName,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
|
||||||
|
oldObj: pendingImmediateClaim,
|
||||||
|
newObj: func() *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := pendingImmediateClaim.DeepCopy()
|
||||||
|
claim.Finalizers = append(claim.Finalizers, "foo")
|
||||||
|
return claim
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"queue-on-status-change": {
|
||||||
|
pod: podWithClaimName,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
|
||||||
|
oldObj: pendingImmediateClaim,
|
||||||
|
newObj: func() *resourcev1alpha2.ResourceClaim {
|
||||||
|
claim := pendingImmediateClaim.DeepCopy()
|
||||||
|
claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
|
||||||
|
return claim
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueImmediately,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testcases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
testCtx := setup(t, nil, tc.claims, nil, nil)
|
||||||
|
if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
|
||||||
|
// Update the informer because the lister gets called and must have the claim.
|
||||||
|
store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore()
|
||||||
|
if tc.oldObj == nil {
|
||||||
|
require.NoError(t, store.Add(claim))
|
||||||
|
} else {
|
||||||
|
require.NoError(t, store.Update(claim))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
actualHint := testCtx.p.isSchedulableAfterClaimChange(tc.pod, tc.oldObj, tc.newObj)
|
||||||
|
require.Equal(t, tc.expectedHint, actualHint)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPodSchedulingContextChange(t *testing.T) {
|
||||||
|
testcases := map[string]struct {
|
||||||
|
pod *v1.Pod
|
||||||
|
schedulings []*resourcev1alpha2.PodSchedulingContext
|
||||||
|
claims []*resourcev1alpha2.ResourceClaim
|
||||||
|
oldObj, newObj interface{}
|
||||||
|
expectedHint framework.QueueingHint
|
||||||
|
}{
|
||||||
|
"skip-deleted": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: scheduling,
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"skip-missed-deleted": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: cache.DeletedFinalStateUnknown{
|
||||||
|
Obj: scheduling,
|
||||||
|
},
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"backoff-wrong-old-object": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: "not-a-scheduling-context",
|
||||||
|
newObj: scheduling,
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
"backoff-missed-wrong-old-object": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: cache.DeletedFinalStateUnknown{
|
||||||
|
Obj: "not-a-scheduling-context",
|
||||||
|
},
|
||||||
|
newObj: scheduling,
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
"skip-unrelated-object": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
newObj: func() *resourcev1alpha2.PodSchedulingContext {
|
||||||
|
scheduling := scheduling.DeepCopy()
|
||||||
|
scheduling.Name += "-foo"
|
||||||
|
return scheduling
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"backoff-wrong-new-object": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: scheduling,
|
||||||
|
newObj: "not-a-scheduling-context",
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
"skip-missing-claim": {
|
||||||
|
pod: podWithClaimTemplate,
|
||||||
|
oldObj: scheduling,
|
||||||
|
newObj: schedulingInfo,
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"skip-missing-infos": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
oldObj: scheduling,
|
||||||
|
newObj: scheduling,
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"queue-new-infos": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
oldObj: scheduling,
|
||||||
|
newObj: schedulingInfo,
|
||||||
|
expectedHint: framework.QueueImmediately,
|
||||||
|
},
|
||||||
|
"queue-bad-selected-node": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
oldObj: func() *resourcev1alpha2.PodSchedulingContext {
|
||||||
|
scheduling := schedulingInfo.DeepCopy()
|
||||||
|
scheduling.Spec.SelectedNode = workerNode.Name
|
||||||
|
return scheduling
|
||||||
|
}(),
|
||||||
|
newObj: func() *resourcev1alpha2.PodSchedulingContext {
|
||||||
|
scheduling := schedulingInfo.DeepCopy()
|
||||||
|
scheduling.Spec.SelectedNode = workerNode.Name
|
||||||
|
scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode)
|
||||||
|
return scheduling
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueImmediately,
|
||||||
|
},
|
||||||
|
"skip-spec-changes": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
oldObj: schedulingInfo,
|
||||||
|
newObj: func() *resourcev1alpha2.PodSchedulingContext {
|
||||||
|
scheduling := schedulingInfo.DeepCopy()
|
||||||
|
scheduling.Spec.SelectedNode = workerNode.Name
|
||||||
|
return scheduling
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueSkip,
|
||||||
|
},
|
||||||
|
"backoff-other-changes": {
|
||||||
|
pod: podWithClaimTemplateInStatus,
|
||||||
|
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
|
||||||
|
oldObj: schedulingInfo,
|
||||||
|
newObj: func() *resourcev1alpha2.PodSchedulingContext {
|
||||||
|
scheduling := schedulingInfo.DeepCopy()
|
||||||
|
scheduling.Finalizers = append(scheduling.Finalizers, "foo")
|
||||||
|
return scheduling
|
||||||
|
}(),
|
||||||
|
expectedHint: framework.QueueAfterBackoff,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testcases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testCtx := setup(t, nil, tc.claims, nil, tc.schedulings)
|
||||||
|
actualHint := testCtx.p.isSchedulableAfterPodSchedulingContextChange(tc.pod, tc.oldObj, tc.newObj)
|
||||||
|
require.Equal(t, tc.expectedHint, actualHint)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/net"
|
"k8s.io/apimachinery/pkg/util/net"
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -164,6 +165,8 @@ func IsScalarResourceName(name v1.ResourceName) bool {
|
|||||||
// As converts two objects to the given type.
|
// As converts two objects to the given type.
|
||||||
// Both objects must be of the same type. If not, an error is returned.
|
// Both objects must be of the same type. If not, an error is returned.
|
||||||
// nil objects are allowed and will be converted to nil.
|
// nil objects are allowed and will be converted to nil.
|
||||||
|
// For oldObj, cache.DeletedFinalStateUnknown is handled and the
|
||||||
|
// object stored in it will be converted instead.
|
||||||
func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) {
|
func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) {
|
||||||
var oldTyped T
|
var oldTyped T
|
||||||
var newTyped T
|
var newTyped T
|
||||||
@ -176,6 +179,9 @@ func As[T runtime.Object](oldObj, newobj interface{}) (T, T, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if oldObj != nil {
|
if oldObj != nil {
|
||||||
|
if realOldObj, ok := oldObj.(cache.DeletedFinalStateUnknown); ok {
|
||||||
|
oldObj = realOldObj.Obj
|
||||||
|
}
|
||||||
oldTyped, ok = oldObj.(T)
|
oldTyped, ok = oldObj.(T)
|
||||||
if !ok {
|
if !ok {
|
||||||
return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj)
|
return oldTyped, newTyped, fmt.Errorf("expected %T, but got %T", oldTyped, oldObj)
|
||||||
|
Loading…
Reference in New Issue
Block a user