feature(KEP-4832): asynchronous preemption

This commit is contained in:
Kensei Nakada 2024-10-18 10:55:46 +10:00
parent 3184eb3d1b
commit 69a8d0ec0b
14 changed files with 1401 additions and 663 deletions

View File

@ -579,6 +579,14 @@ const (
// which benefits to reduce the useless requeueing. // which benefits to reduce the useless requeueing.
SchedulerQueueingHints featuregate.Feature = "SchedulerQueueingHints" SchedulerQueueingHints featuregate.Feature = "SchedulerQueueingHints"
// owner: @sanposhiho
// kep: http://kep.k8s.io/4832
// alpha: v1.32
//
// Running some expensive operation within the scheduler's preemption asynchronously,
// which improves the scheduling latency when the preemption involves in.
SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption"
// owner: @atosatto @yuanchen8911 // owner: @atosatto @yuanchen8911
// kep: http://kep.k8s.io/3902 // kep: http://kep.k8s.io/3902
// //

View File

@ -639,6 +639,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta},
}, },
SchedulerAsyncPreemption: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
SELinuxChangePolicy: { SELinuxChangePolicy: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
}, },

View File

@ -52,6 +52,7 @@ var ExpandedPluginsV1 = &config.Plugins{
PreEnqueue: config.PluginSet{ PreEnqueue: config.PluginSet{
Enabled: []config.Plugin{ Enabled: []config.Plugin{
{Name: names.SchedulingGates}, {Name: names.SchedulingGates},
{Name: names.DefaultPreemption},
}, },
}, },
QueueSort: config.PluginSet{ QueueSort: config.PluginSet{

View File

@ -53,6 +53,7 @@ type DefaultPreemption struct {
args config.DefaultPreemptionArgs args config.DefaultPreemptionArgs
podLister corelisters.PodLister podLister corelisters.PodLister
pdbLister policylisters.PodDisruptionBudgetLister pdbLister policylisters.PodDisruptionBudgetLister
Evaluator *preemption.Evaluator
} }
var _ framework.PostFilterPlugin = &DefaultPreemption{} var _ framework.PostFilterPlugin = &DefaultPreemption{}
@ -71,13 +72,19 @@ func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feat
if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil { if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
return nil, err return nil, err
} }
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
pdbLister := getPDBLister(fh.SharedInformerFactory())
pl := DefaultPreemption{ pl := DefaultPreemption{
fh: fh, fh: fh,
fts: fts, fts: fts,
args: *args, args: *args,
podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(), podLister: podLister,
pdbLister: getPDBLister(fh.SharedInformerFactory()), pdbLister: pdbLister,
} }
pl.Evaluator = preemption.NewEvaluator(Name, fh, &pl, fts.EnableAsyncPreemption)
return &pl, nil return &pl, nil
} }
@ -87,16 +94,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
metrics.PreemptionAttempts.Inc() metrics.PreemptionAttempts.Inc()
}() }()
pe := preemption.Evaluator{ result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
PluginName: names.DefaultPreemption,
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: pl,
}
result, status := pe.Preempt(ctx, pod, m)
msg := status.Message() msg := status.Message()
if len(msg) > 0 { if len(msg) > 0 {
return result, framework.NewStatus(status.Code(), "preemption: "+msg) return result, framework.NewStatus(status.Code(), "preemption: "+msg)
@ -104,6 +102,22 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
return result, status return result, status
} }
func (pl *DefaultPreemption) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
if !pl.fts.EnableAsyncPreemption {
return nil
}
if pl.Evaluator.IsPodRunningPreemption(p.GetUID()) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for the preemption for this pod to be finished")
}
return nil
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *DefaultPreemption) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return nil, nil
}
// calculateNumCandidates returns the number of candidates the FindCandidates // calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by // method must produce from dry running based on the constraints given by
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of // <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of

View File

@ -25,6 +25,7 @@ import (
"math/rand" "math/rand"
"sort" "sort"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -37,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
@ -436,6 +438,7 @@ func TestPostFilter(t *testing.T) {
pdbLister: getPDBLister(informerFactory), pdbLister: getPDBLister(informerFactory),
args: *getDefaultDefaultPreemptionArgs(), args: *getDefaultDefaultPreemptionArgs(),
} }
p.Evaluator = preemption.NewEvaluator(names.DefaultPreemption, f, &p, false)
state := framework.NewCycleState() state := framework.NewCycleState()
// Ensure <state> is populated. // Ensure <state> is populated.
@ -1206,11 +1209,10 @@ func TestDryRunPreemption(t *testing.T) {
Handler: pl.fh, Handler: pl.fh,
PodLister: pl.podLister, PodLister: pl.podLister,
PdbLister: pl.pdbLister, PdbLister: pl.pdbLister,
State: state,
Interface: pl, Interface: pl,
} }
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates) got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got { for i := range got {
victims := got[i].Victims().Pods victims := got[i].Victims().Pods
@ -1447,11 +1449,10 @@ func TestSelectBestCandidate(t *testing.T) {
Handler: pl.fh, Handler: pl.fh,
PodLister: pl.podLister, PodLister: pl.podLister,
PdbLister: pl.pdbLister, PdbLister: pl.pdbLister,
State: state,
Interface: pl, Interface: pl,
} }
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos))) offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates) candidates, _, _ := pe.DryRunPreemption(ctx, state, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(ctx, candidates) s := pe.SelectCandidate(ctx, candidates)
if s == nil || len(s.Name()) == 0 { if s == nil || len(s.Name()) == 0 {
return return
@ -1549,6 +1550,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
} }
} }
func TestPreempt(t *testing.T) { func TestPreempt(t *testing.T) {
metrics.Register()
tests := []struct { tests := []struct {
name string name string
pod *v1.Pod pod *v1.Pod
@ -1713,16 +1715,29 @@ func TestPreempt(t *testing.T) {
} }
labelKeys := []string{"hostname", "zone", "region"} labelKeys := []string{"hostname", "zone", "region"}
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
client := clientsetfake.NewClientset() client := clientsetfake.NewClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods().Informer() podInformer := informerFactory.Core().V1().Pods().Informer()
podInformer.GetStore().Add(test.pod) testPod := test.pod.DeepCopy()
testPods := make([]*v1.Pod, len(test.pods))
for i := range test.pods { for i := range test.pods {
podInformer.GetStore().Add(test.pods[i]) testPods[i] = test.pods[i].DeepCopy()
} }
if err := podInformer.GetStore().Add(testPod); err != nil {
t.Fatalf("Failed to add test pod %s: %v", testPod.Name, err)
}
for i := range testPods {
if err := podInformer.GetStore().Add(testPods[i]); err != nil {
t.Fatalf("Failed to add test pod %s: %v", testPods[i], err)
}
}
// Need to protect deletedPodNames and patchedPodNames to prevent DATA RACE panic.
var mu sync.RWMutex
deletedPodNames := sets.New[string]() deletedPodNames := sets.New[string]()
patchedPodNames := sets.New[string]() patchedPodNames := sets.New[string]()
patchedPods := []*v1.Pod{} patchedPods := []*v1.Pod{}
@ -1748,10 +1763,14 @@ func TestPreempt(t *testing.T) {
t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err) t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err)
} }
patchedPods = append(patchedPods, updatedPod) patchedPods = append(patchedPods, updatedPod)
mu.Lock()
defer mu.Unlock()
patchedPodNames.Insert(podName) patchedPodNames.Insert(podName)
return true, nil, nil return true, nil, nil
}) })
client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
mu.Lock()
defer mu.Unlock()
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName()) deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
return true, nil, nil return true, nil, nil
}) })
@ -1763,8 +1782,10 @@ func TestPreempt(t *testing.T) {
waitingPods := frameworkruntime.NewWaitingPodsMap() waitingPods := frameworkruntime.NewWaitingPodsMap()
cache := internalcache.New(ctx, time.Duration(0)) cache := internalcache.New(ctx, time.Duration(0))
for _, pod := range test.pods { for _, pod := range testPods {
cache.AddPod(logger, pod) if err := cache.AddPod(logger, pod.DeepCopy()); err != nil {
t.Fatalf("Failed to add pod %s: %v", pod.Name, err)
}
} }
cachedNodeInfoMap := map[string]*framework.NodeInfo{} cachedNodeInfoMap := map[string]*framework.NodeInfo{}
nodes := make([]*v1.Node, len(test.nodeNames)) nodes := make([]*v1.Node, len(test.nodeNames))
@ -1777,6 +1798,7 @@ func TestPreempt(t *testing.T) {
node.ObjectMeta.Labels[labelKeys[i]] = label node.ObjectMeta.Labels[labelKeys[i]] = label
} }
node.Name = node.ObjectMeta.Labels["hostname"] node.Name = node.ObjectMeta.Labels["hostname"]
t.Logf("node is added: %v. labels: %#v", node.Name, node.ObjectMeta.Labels)
cache.AddNode(logger, node) cache.AddNode(logger, node)
nodes[i] = node nodes[i] = node
@ -1803,7 +1825,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders), frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(testPods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods), frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger), frameworkruntime.WithLogger(logger),
@ -1814,7 +1836,7 @@ func TestPreempt(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState. // Some tests rely on PreFilter plugin to compute its CycleState.
if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() { if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, testPod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s) t.Errorf("Unexpected preFilterStatus: %v", s)
} }
// Call preempt and check the expected results. // Call preempt and check the expected results.
@ -1825,14 +1847,7 @@ func TestPreempt(t *testing.T) {
args: *getDefaultDefaultPreemptionArgs(), args: *getDefaultDefaultPreemptionArgs(),
} }
pe := preemption.Evaluator{ pe := preemption.NewEvaluator(names.DefaultPreemption, pl.fh, &pl, asyncPreemptionEnabled)
PluginName: names.DefaultPreemption,
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: &pl,
}
// so that these nodes are eligible for preemption, we set their status // so that these nodes are eligible for preemption, we set their status
// to Unschedulable. // to Unschedulable.
@ -1842,16 +1857,33 @@ func TestPreempt(t *testing.T) {
nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable)) nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable))
} }
res, status := pe.Preempt(ctx, test.pod, nodeToStatusMap) res, status := pe.Preempt(ctx, state, testPod, nodeToStatusMap)
if !status.IsSuccess() && !status.IsRejected() { if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError()) t.Errorf("unexpected error in preemption: %v", status.AsError())
} }
if diff := cmp.Diff(test.want, res); diff != "" { if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff) t.Errorf("Unexpected status (-want, +got):\n%s", diff)
} }
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames)) if asyncPreemptionEnabled {
// Wait for the pod to be deleted.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
mu.RLock()
defer mu.RUnlock()
return len(deletedPodNames) == len(test.expectedPods), nil
}); err != nil {
t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames))
} }
} else {
mu.RLock()
// If async preemption is disabled, the pod should be deleted immediately.
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames))
}
mu.RUnlock()
}
mu.RLock()
if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" { if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" {
t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff) t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff)
} }
@ -1884,20 +1916,21 @@ func TestPreempt(t *testing.T) {
} }
} }
if res != nil && res.NominatingInfo != nil { if res != nil && res.NominatingInfo != nil {
test.pod.Status.NominatedNodeName = res.NominatedNodeName testPod.Status.NominatedNodeName = res.NominatedNodeName
} }
// Manually set the deleted Pods' deletionTimestamp to non-nil. // Manually set the deleted Pods' deletionTimestamp to non-nil.
for _, pod := range test.pods { for _, pod := range testPods {
if deletedPodNames.Has(pod.Name) { if deletedPodNames.Has(pod.Name) {
now := metav1.Now() now := metav1.Now()
pod.DeletionTimestamp = &now pod.DeletionTimestamp = &now
deletedPodNames.Delete(pod.Name) deletedPodNames.Delete(pod.Name)
} }
} }
mu.RUnlock()
// Call preempt again and make sure it doesn't preempt any more pods. // Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(ctx, test.pod, framework.NewDefaultNodeToStatus()) res, status = pe.Preempt(ctx, state, testPod, framework.NewDefaultNodeToStatus())
if !status.IsSuccess() && !status.IsRejected() { if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError()) t.Errorf("unexpected error in preemption: %v", status.AsError())
} }
@ -1907,3 +1940,4 @@ func TestPreempt(t *testing.T) {
}) })
} }
} }
}

View File

@ -28,4 +28,5 @@ type Features struct {
EnableInPlacePodVerticalScaling bool EnableInPlacePodVerticalScaling bool
EnableSidecarContainers bool EnableSidecarContainers bool
EnableSchedulingQueueHint bool EnableSchedulingQueueHint bool
EnableAsyncPreemption bool
} }

View File

@ -54,6 +54,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers), EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers),
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
} }
registry := runtime.Registry{ registry := runtime.Registry{

View File

@ -26,7 +26,9 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1" policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1" policylisters "k8s.io/client-go/listers/policy/v1"
@ -36,6 +38,7 @@ import (
apipod "k8s.io/kubernetes/pkg/api/v1/pod" apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
) )
@ -125,10 +128,48 @@ type Evaluator struct {
Handler framework.Handle Handler framework.Handle
PodLister corelisters.PodLister PodLister corelisters.PodLister
PdbLister policylisters.PodDisruptionBudgetLister PdbLister policylisters.PodDisruptionBudgetLister
State *framework.CycleState
enableAsyncPreemption bool
mu sync.RWMutex
// preempting is a map that records the pods that are currently triggering preemption asynchronously,
// which is used to prevent the pods from entering the scheduling cycle meanwhile.
preempting map[types.UID]struct{}
// PreemptPod is a function that actually makes API calls to preempt a specific Pod.
// This is exposed to be replaced during tests.
PreemptPod func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error
Interface Interface
} }
func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsyncPreemption bool) *Evaluator {
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister()
e := &Evaluator{
PluginName: names.DefaultPreemption,
Handler: fh,
PodLister: podLister,
PdbLister: pdbLister,
Interface: i,
enableAsyncPreemption: enableAsyncPreemption,
preempting: make(map[types.UID]struct{}),
}
e.PreemptPod = e.preemptPod
return e
}
// IsPodRunningPreemption returns true if the pod is currently triggering preemption asynchronously.
func (ev *Evaluator) IsPodRunningPreemption(podUID types.UID) bool {
ev.mu.RLock()
defer ev.mu.RUnlock()
_, ok := ev.preempting[podUID]
return ok
}
// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status. // Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status.
// The semantics of returned <PostFilterResult, Status> varies on different scenarios: // The semantics of returned <PostFilterResult, Status> varies on different scenarios:
// //
@ -145,7 +186,7 @@ type Evaluator struct {
// //
// - <non-nil PostFilterResult, Success>. It's the regular happy path // - <non-nil PostFilterResult, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod. // and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) { func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// 0) Fetch the latest version of <pod>. // 0) Fetch the latest version of <pod>.
@ -171,7 +212,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
if err != nil { if err != nil {
return nil, framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, allNodes, pod, m) candidates, nodeToStatusMap, err := ev.findCandidates(ctx, state, allNodes, pod, m)
if err != nil && len(candidates) == 0 { if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
@ -203,17 +244,25 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption") return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
} }
logger.V(2).Info("the target node for the preemption is determined", "node", bestCandidate.Name(), "pod", klog.KObj(pod))
// 5) Perform preparation work before nominating the selected candidate. // 5) Perform preparation work before nominating the selected candidate.
if ev.enableAsyncPreemption {
if status := ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
} else {
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status return nil, status
} }
}
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success) return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
} }
// FindCandidates calculates a slice of preemption candidates. // FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable. // Each candidate is executable to make the given <pod> schedulable.
func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) { func (ev *Evaluator) findCandidates(ctx context.Context, state *framework.CycleState, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
if len(allNodes) == 0 { if len(allNodes) == 0 {
return nil, nil, errors.New("no nodes available") return nil, nil, errors.New("no nodes available")
} }
@ -239,7 +288,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.N
} }
offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes))) offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
return ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, candidatesNum) return ev.DryRunPreemption(ctx, state, pod, potentialNodes, pdbs, offset, candidatesNum)
} }
// callExtenders calls given <extenders> to select the list of feasible candidates. // callExtenders calls given <extenders> to select the list of feasible candidates.
@ -335,6 +384,46 @@ func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate
return candidates[0] return candidates[0]
} }
// preemptPod actually makes API calls to preempt a specific Pod.
func (ev *Evaluator) preemptPod(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
logger := klog.FromContext(ctx)
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name())
return nil
}
// prepareCandidate does some preparation work before nominating the selected candidate: // prepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods // - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map // - Reject the victim pods if they are in waitingPod map
@ -347,41 +436,11 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
defer cancel() defer cancel()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel() errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) { fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
victim := c.Victims().Pods[index] if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return
} }
} }, ev.PluginName)
if err := util.DeletePod(ctx, cs, victim); err != nil {
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", pod.UID, c.Name())
}
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
@ -401,6 +460,72 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
return nil return nil
} }
// prepareCandidateAsync triggers a goroutine for some preparation work:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
// The Pod won't be retried until the goroutine triggered here completes.
//
// See http://kep.k8s.io/4832 for how the async preemption works.
func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
// because this process could continue even after this scheduling cycle finishes.
ctx, cancel := context.WithCancel(context.Background())
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
}
ev.mu.Lock()
ev.preempting[pod.UID] = struct{}{}
ev.mu.Unlock()
logger := klog.FromContext(ctx)
go func() { // TODO: use paralizer
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil {
logger.Error(err, "Cannot clear 'NominatedNodeName' field")
// We do not return as this error is not critical.
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
logger.Error(err, "Error occurred during preemption")
}
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
logger.Error(err, "Error occurred during preemption")
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name())
}()
return nil
}
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
if pdbLister != nil { if pdbLister != nil {
return pdbLister.List(labels.Everything()) return pdbLister.List(labels.Everything())
@ -538,7 +663,7 @@ func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of // The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do. // candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption. // NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo, func (ev *Evaluator) DryRunPreemption(ctx context.Context, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, candidatesNum int32) ([]Candidate, *framework.NodeToStatus, error) { pdbs []*policy.PodDisruptionBudget, offset int32, candidatesNum int32) ([]Candidate, *framework.NodeToStatus, error) {
fh := ev.Handler fh := ev.Handler
@ -557,7 +682,7 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot() nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot()
logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name) logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name)
stateCopy := ev.State.Clone() stateCopy := state.Clone()
pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs) pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
if status.IsSuccess() && len(pods) != 0 { if status.IsSuccess() && len(pods) != 0 {
victims := extenderv1.Victims{ victims := extenderv1.Victims{

View File

@ -243,9 +243,8 @@ func TestDryRunPreemption(t *testing.T) {
PluginName: "FakePostFilter", PluginName: "FakePostFilter",
Handler: fwk, Handler: fwk,
Interface: fakePostPlugin, Interface: fakePostPlugin,
State: state,
} }
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName). // Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got { for i := range got {
victims := got[i].Victims().Pods victims := got[i].Victims().Pods
@ -344,9 +343,8 @@ func TestSelectCandidate(t *testing.T) {
PluginName: "FakePreemptionScorePostFilter", PluginName: "FakePreemptionScorePostFilter",
Handler: fwk, Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin, Interface: fakePreemptionScorePostFilterPlugin,
State: state,
} }
candidates, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos))) candidates, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
s := pe.SelectCandidate(ctx, candidates) s := pe.SelectCandidate(ctx, candidates)
if s == nil || len(s.Name()) == 0 { if s == nil || len(s.Name()) == 0 {
t.Errorf("expect any node in %v, but no candidate selected", tt.expected) t.Errorf("expect any node in %v, but no candidate selected", tt.expected)

View File

@ -310,6 +310,12 @@ func (p *PodWrapper) Name(s string) *PodWrapper {
return p return p
} }
// Name sets `s` as the name of the inner pod.
func (p *PodWrapper) GenerateName(s string) *PodWrapper {
p.SetGenerateName(s)
return p
}
// UID sets `s` as the UID of the inner pod. // UID sets `s` as the UID of the inner pod.
func (p *PodWrapper) UID(s string) *PodWrapper { func (p *PodWrapper) UID(s string) *PodWrapper {
p.SetUID(types.UID(s)) p.SetUID(types.UID(s))

View File

@ -1060,6 +1060,12 @@
lockToDefault: false lockToDefault: false
preRelease: Alpha preRelease: Alpha
version: "1.29" version: "1.29"
- name: SchedulerAsyncPreemption
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: SchedulerQueueingHints - name: SchedulerQueueingHints
versionedSpecs: versionedSpecs:
- default: false - default: false

View File

@ -33,23 +33,34 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/storage/volume" "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2" "k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1" configv1 "k8s.io/kube-scheduler/config/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/priority"
testutils "k8s.io/kubernetes/test/integration/util" testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"k8s.io/utils/ptr"
) )
// imported from testutils // imported from testutils
@ -452,8 +463,11 @@ func TestPreemption(t *testing.T) {
t.Fatalf("Error creating node: %v", err) t.Fatalf("Error creating node: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, asyncPreemptionEnabled)
filter.Tokens = test.initTokens filter.Tokens = test.initTokens
filter.EnablePreFilter = test.enablePreFilter filter.EnablePreFilter = test.enablePreFilter
filter.Unresolvable = test.unresolvable filter.Unresolvable = test.unresolvable
@ -485,12 +499,10 @@ func TestPreemption(t *testing.T) {
if cond == nil { if cond == nil {
t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.DisruptionTarget) t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.DisruptionTarget)
} }
} else { } else if p.DeletionTimestamp != nil {
if p.DeletionTimestamp != nil {
t.Errorf("Didn't expect pod %v to get preempted.", p.Name) t.Errorf("Didn't expect pod %v to get preempted.", p.Name)
} }
} }
}
// Also check that the preemptor pod gets the NominatedNodeName field set. // Also check that the preemptor pod gets the NominatedNodeName field set.
if len(test.preemptedPodIndexes) > 0 { if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(cs, preemptor); err != nil { if err := waitForNominatedNodeName(cs, preemptor); err != nil {
@ -504,6 +516,507 @@ func TestPreemption(t *testing.T) {
}) })
} }
} }
}
func TestAsyncPreemption(t *testing.T) {
type createPod struct {
pod *v1.Pod
// count is the number of times the pod should be created by this action.
// i.e., if you use it, you have to use GenerateName.
// By default, it's 1.
count *int
}
type schedulePod struct {
podName string
expectSuccess bool
}
type scenario struct {
// name is this step's name, just for the debugging purpose.
name string
// Only one of the following actions should be set.
// createPod creates a Pod.
createPod *createPod
// schedulePod schedules one Pod that is at the top of the activeQ.
// You should give a Pod name that is supposed to be scheduled.
schedulePod *schedulePod
// completePreemption completes the preemption that is currently on-going.
// You should give a Pod name.
completePreemption string
// podGatedInQueue checks if the given Pod is in the scheduling queue and gated by the preemption.
// You should give a Pod name.
podGatedInQueue string
// podRunningPreemption checks if the given Pod is running preemption.
// You should give a Pod index representing the order of Pod creation.
// e.g., if you want to check the Pod created first in the test case, you should give 0.
podRunningPreemption *int
}
tests := []struct {
name string
// scenarios after the first attempt of scheduling the pod.
scenarios []scenario
}{
{
// Very basic test case: if it fails, the basic scenario is broken somewhere.
name: "basic: async preemption happens expectedly",
scenarios: []scenario{
{
name: "create scheduled Pod",
createPod: &createPod{
pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(),
count: ptr.To(2),
},
},
{
name: "create a preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
},
},
{
name: "schedule the preemptor Pod",
schedulePod: &schedulePod{
podName: "preemptor",
},
},
{
name: "check the pod is in the queue and gated",
podGatedInQueue: "preemptor",
},
{
name: "check the preemptor Pod making the preemption API calls",
podRunningPreemption: ptr.To(2),
},
{
name: "complete the preemption API calls",
completePreemption: "preemptor",
},
{
name: "schedule the preemptor Pod after the preemption",
schedulePod: &schedulePod{
podName: "preemptor",
expectSuccess: true,
},
},
},
},
{
name: "Lower priority Pod doesn't take over the place for higher priority Pod that is running the preemption",
scenarios: []scenario{
{
name: "create scheduled Pod",
createPod: &createPod{
pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(),
count: ptr.To(2),
},
},
{
name: "create a preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
},
},
{
name: "schedule the preemptor Pod",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
},
},
{
name: "check the pod is in the queue and gated",
podGatedInQueue: "preemptor-high-priority",
},
{
name: "check the preemptor Pod making the preemption API calls",
podRunningPreemption: ptr.To(2),
},
{
// This Pod is lower priority than the preemptor Pod.
// Given the preemptor Pod is nominated to the node, this Pod should be unschedulable.
name: "create a second Pod that is lower priority than the first preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("pod-mid-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(50).Obj(),
},
},
{
name: "schedule the mid-priority Pod",
schedulePod: &schedulePod{
podName: "pod-mid-priority",
},
},
{
name: "complete the preemption API calls",
completePreemption: "preemptor-high-priority",
},
{
// the preemptor pod should be popped from the queue before the mid-priority pod.
name: "schedule the preemptor Pod again",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
expectSuccess: true,
},
},
{
name: "schedule the mid-priority Pod again",
schedulePod: &schedulePod{
podName: "pod-mid-priority",
},
},
},
},
{
name: "Higher priority Pod takes over the place for higher priority Pod that is running the preemption",
scenarios: []scenario{
{
name: "create scheduled Pod",
createPod: &createPod{
pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(),
count: ptr.To(4),
},
},
{
name: "create a preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(),
},
},
{
name: "schedule the preemptor Pod",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
},
},
{
name: "check the pod is in the queue and gated",
podGatedInQueue: "preemptor-high-priority",
},
{
name: "check the preemptor Pod making the preemption API calls",
podRunningPreemption: ptr.To(4),
},
{
// This Pod is higher priority than the preemptor Pod.
// Even though the preemptor Pod is nominated to the node, this Pod can take over the place.
name: "create a second Pod that is higher priority than the first preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor-super-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(200).Obj(),
},
},
{
name: "schedule the super-high-priority Pod",
schedulePod: &schedulePod{
podName: "preemptor-super-high-priority",
},
},
{
name: "check the super-high-priority Pod making the preemption API calls",
podRunningPreemption: ptr.To(5),
},
{
// the super-high-priority preemptor should enter the preemption
// and select the place where the preemptor-high-priority selected.
// So, basically both goroutines are preempting the same Pods.
name: "check the super-high-priority pod is in the queue and gated",
podGatedInQueue: "preemptor-super-high-priority",
},
{
name: "complete the preemption API calls of super-high-priority",
completePreemption: "preemptor-super-high-priority",
},
{
name: "complete the preemption API calls of high-priority",
completePreemption: "preemptor-high-priority",
},
{
name: "schedule the super-high-priority Pod",
schedulePod: &schedulePod{
podName: "preemptor-super-high-priority",
expectSuccess: true,
},
},
{
name: "schedule the high-priority Pod",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
},
},
},
},
{
name: "Lower priority Pod can select the same place where the higher priority Pod is preempting if the node is big enough",
scenarios: []scenario{
{
name: "create scheduled Pod",
createPod: &createPod{
pod: st.MakePod().GenerateName("victim-").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Node("node").Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(),
count: ptr.To(4),
},
},
{
// It will preempt two victims.
name: "create a preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor-high-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Priority(100).Obj(),
},
},
{
name: "schedule the preemptor Pod",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
},
},
{
name: "check the pod is in the queue and gated",
podGatedInQueue: "preemptor-high-priority",
},
{
name: "check the preemptor Pod making the preemption API calls",
podRunningPreemption: ptr.To(4),
},
{
// This Pod is lower priority than the preemptor Pod.
// Given the preemptor Pod is nominated to the node, this Pod should be unschedulable.
// This Pod will trigger the preemption to target the two victims that the first Pod doesn't target.
name: "create a second Pod that is lower priority than the first preemptor Pod",
createPod: &createPod{
pod: st.MakePod().Name("preemptor-mid-priority").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Priority(50).Obj(),
},
},
{
name: "schedule the mid-priority Pod",
schedulePod: &schedulePod{
podName: "preemptor-mid-priority",
},
},
{
name: "check the mid-priority pod is in the queue and gated",
podGatedInQueue: "preemptor-mid-priority",
},
{
name: "check the mid-priority Pod making the preemption API calls",
podRunningPreemption: ptr.To(5),
},
{
name: "complete the preemption API calls",
completePreemption: "preemptor-mid-priority",
},
{
name: "complete the preemption API calls",
completePreemption: "preemptor-high-priority",
},
{
// the preemptor pod should be popped from the queue before the mid-priority pod.
name: "schedule the preemptor Pod again",
schedulePod: &schedulePod{
podName: "preemptor-high-priority",
expectSuccess: true,
},
},
{
name: "schedule the mid-priority Pod again",
schedulePod: &schedulePod{
podName: "preemptor-mid-priority",
expectSuccess: true,
},
},
},
},
}
// All test cases have the same node.
node := st.MakeNode().Name("node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// We need to use a custom preemption plugin to test async preemption behavior
delayedPreemptionPluginName := "delay-preemption"
// keyed by the pod name
preemptionDoneChannels := make(map[string]chan struct{})
defer func() {
for _, ch := range preemptionDoneChannels {
close(ch)
}
}()
registry := make(frameworkruntime.Registry)
var preemptionPlugin *defaultpreemption.DefaultPreemption
err := registry.Register(delayedPreemptionPluginName, func(c context.Context, r runtime.Object, fh framework.Handle) (framework.Plugin, error) {
p, err := frameworkruntime.FactoryAdapter(plfeature.Features{EnableAsyncPreemption: true}, defaultpreemption.New)(c, &config.DefaultPreemptionArgs{
// Set default values to pass the validation at the initialization, not related to the test.
MinCandidateNodesPercentage: 10,
MinCandidateNodesAbsolute: 100,
}, fh)
if err != nil {
return nil, fmt.Errorf("error creating default preemption plugin: %w", err)
}
var ok bool
preemptionPlugin, ok = p.(*defaultpreemption.DefaultPreemption)
if !ok {
return nil, fmt.Errorf("unexpected plugin type %T", p)
}
preemptPodFn := preemptionPlugin.Evaluator.PreemptPod
preemptionPlugin.Evaluator.PreemptPod = func(ctx context.Context, c preemption.Candidate, preemptor, victim *v1.Pod, pluginName string) error {
// block the preemption goroutine to complete until the test case allows it to proceed.
if ch, ok := preemptionDoneChannels[preemptor.Name]; ok {
<-ch
}
err := preemptPodFn(ctx, c, preemptor, victim, pluginName)
if err != nil {
return err
}
return nil
}
return preemptionPlugin, nil
})
if err != nil {
t.Fatalf("Error registering a filter: %v", err)
}
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
MultiPoint: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: delayedPreemptionPluginName},
},
Disabled: []configv1.Plugin{
{Name: names.DefaultPreemption},
},
},
},
}},
})
// It initializes the scheduler, but doesn't start.
// We manually trigger the scheduling cycle.
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption", nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
// disable backoff
scheduler.WithPodMaxBackoffSeconds(0),
scheduler.WithPodInitialBackoffSeconds(0),
)
testutils.SyncSchedulerInformerFactory(testCtx)
cs := testCtx.ClientSet
if preemptionPlugin == nil {
t.Fatalf("the preemption plugin should be initialized")
}
logger, _ := ktesting.NewTestContext(t)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncPreemption, true)
createdPods := []*v1.Pod{}
defer testutils.CleanupPods(testCtx.Ctx, cs, t, createdPods)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create an initial Node %q: %v", node.Name, err)
}
defer func() {
if err := cs.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete the Node %q: %v", node.Name, err)
}
}()
for _, scenario := range test.scenarios {
t.Logf("Running scenario: %s", scenario.name)
switch {
case scenario.createPod != nil:
if scenario.createPod.count == nil {
scenario.createPod.count = ptr.To(1)
}
for i := 0; i < *scenario.createPod.count; i++ {
pod, err := cs.CoreV1().Pods(testCtx.NS.Name).Create(ctx, scenario.createPod.pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create a Pod %q: %v", pod.Name, err)
}
createdPods = append(createdPods, pod)
}
case scenario.schedulePod != nil:
if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
activePods := testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()
return len(activePods) != 0, nil
}); err != nil {
t.Fatalf("Expected the pod %s to be scheduled, but no pod arrives at the activeQ", scenario.schedulePod.podName)
}
if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name == scenario.schedulePod.podName, nil
}); err != nil {
t.Fatalf("The pod %s is expected to be scheduled, but the top Pod is %s", scenario.schedulePod.podName, testCtx.Scheduler.SchedulingQueue.PodsInActiveQ()[0].Name)
}
preemptionDoneChannels[scenario.schedulePod.podName] = make(chan struct{})
testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
if scenario.schedulePod.expectSuccess {
if err := wait.PollUntilContextTimeout(testCtx.Ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, testCtx.NS.Name, scenario.schedulePod.podName)); err != nil {
t.Fatalf("Expected the pod %s to be scheduled", scenario.schedulePod.podName)
}
} else {
if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.schedulePod.podName) {
t.Fatalf("Expected the pod %s to be in the queue after the scheduling attempt", scenario.schedulePod.podName)
}
}
case scenario.completePreemption != "":
if _, ok := preemptionDoneChannels[scenario.completePreemption]; !ok {
t.Fatalf("The preemptor Pod %q is not running preemption", scenario.completePreemption)
}
close(preemptionDoneChannels[scenario.completePreemption])
delete(preemptionDoneChannels, scenario.completePreemption)
case scenario.podGatedInQueue != "":
// make sure the Pod is in the queue in the first place.
if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.podGatedInQueue) {
t.Fatalf("Expected the pod %s to be in the queue", scenario.podGatedInQueue)
}
// Make sure this Pod is gated by the preemption at PreEnqueue extension point
// by activating the Pod and see if it's still in the unsched pod pool.
testCtx.Scheduler.SchedulingQueue.Activate(logger, map[string]*v1.Pod{scenario.podGatedInQueue: st.MakePod().Namespace(testCtx.NS.Name).Name(scenario.podGatedInQueue).Obj()})
if !podInUnschedulablePodPool(t, testCtx.Scheduler.SchedulingQueue, scenario.podGatedInQueue) {
t.Fatalf("Expected the pod %s to be in the queue even after the activation", scenario.podGatedInQueue)
}
case scenario.podRunningPreemption != nil:
if err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
return preemptionPlugin.Evaluator.IsPodRunningPreemption(createdPods[*scenario.podRunningPreemption].GetUID()), nil
}); err != nil {
t.Fatalf("Expected the pod %s to be running preemption", createdPods[*scenario.podRunningPreemption].Name)
}
}
}
})
}
}
// podInUnschedulablePodPool checks if the given Pod is in the unschedulable pod pool.
func podInUnschedulablePodPool(t *testing.T, queue queue.SchedulingQueue, podName string) bool {
t.Helper()
// First, look for the pod in the activeQ.
for _, pod := range queue.PodsInActiveQ() {
if pod.Name == podName {
return false
}
}
pendingPods, _ := queue.PendingPods()
for _, pod := range pendingPods {
if pod.Name == podName {
return true
}
}
return false
}
// TestNonPreemption tests NonPreempt option of PriorityClass of scheduler works as expected. // TestNonPreemption tests NonPreempt option of PriorityClass of scheduler works as expected.
func TestNonPreemption(t *testing.T) { func TestNonPreemption(t *testing.T) {
@ -554,8 +1067,10 @@ func TestNonPreemption(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Error creating nodes: %v", err) t.Fatalf("Error creating nodes: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
defer testutils.CleanupPods(testCtx.Ctx, cs, t, []*v1.Pod{preemptor, victim}) defer testutils.CleanupPods(testCtx.Ctx, cs, t, []*v1.Pod{preemptor, victim})
preemptor.Spec.PreemptionPolicy = test.PreemptionPolicy preemptor.Spec.PreemptionPolicy = test.PreemptionPolicy
victimPod, err := createPausePod(cs, victim) victimPod, err := createPausePod(cs, victim)
@ -582,6 +1097,7 @@ func TestNonPreemption(t *testing.T) {
}) })
} }
} }
}
// TestDisablePreemption tests disable pod preemption of scheduler works as expected. // TestDisablePreemption tests disable pod preemption of scheduler works as expected.
func TestDisablePreemption(t *testing.T) { func TestDisablePreemption(t *testing.T) {
@ -630,8 +1146,9 @@ func TestDisablePreemption(t *testing.T) {
t.Fatalf("Error creating nodes: %v", err) t.Fatalf("Error creating nodes: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
pods := make([]*v1.Pod, len(test.existingPods)) pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods. // Create and run existingPods.
for i, p := range test.existingPods { for i, p := range test.existingPods {
@ -661,6 +1178,7 @@ func TestDisablePreemption(t *testing.T) {
}) })
} }
} }
}
// This test verifies that system critical priorities are created automatically and resolved properly. // This test verifies that system critical priorities are created automatically and resolved properly.
func TestPodPriorityResolution(t *testing.T) { func TestPodPriorityResolution(t *testing.T) {
@ -736,9 +1254,9 @@ func TestPodPriorityResolution(t *testing.T) {
} }
pods := make([]*v1.Pod, 0, len(tests)) pods := make([]*v1.Pod, 0, len(tests))
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.Name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.Name, asyncPreemptionEnabled), func(t *testing.T) {
t.Run(test.Name, func(t *testing.T) {
pod, err := runPausePod(cs, test.Pod) pod, err := runPausePod(cs, test.Pod)
if err != nil { if err != nil {
if test.ExpectedError == nil { if test.ExpectedError == nil {
@ -757,10 +1275,10 @@ func TestPodPriorityResolution(t *testing.T) {
} else { } else {
t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass) t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass)
} }
}) testutils.CleanupPods(testCtx.Ctx, cs, t, pods)
}) })
} }
testutils.CleanupPods(testCtx.Ctx, cs, t, pods) }
testutils.CleanupNodes(cs, t) testutils.CleanupNodes(cs, t)
} }
@ -824,8 +1342,9 @@ func TestPreemptionStarvation(t *testing.T) {
t.Fatalf("Error creating nodes: %v", err) t.Fatalf("Error creating nodes: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
pendingPods := make([]*v1.Pod, test.numExpectedPending) pendingPods := make([]*v1.Pod, test.numExpectedPending)
numRunningPods := test.numExistingPod - test.numExpectedPending numRunningPods := test.numExistingPod - test.numExpectedPending
runningPods := make([]*v1.Pod, numRunningPods) runningPods := make([]*v1.Pod, numRunningPods)
@ -878,6 +1397,7 @@ func TestPreemptionStarvation(t *testing.T) {
}) })
} }
} }
}
// TestPreemptionRaces tests that other scheduling events and operations do not // TestPreemptionRaces tests that other scheduling events and operations do not
// race with the preemption process. // race with the preemption process.
@ -924,8 +1444,9 @@ func TestPreemptionRaces(t *testing.T) {
t.Fatalf("Error creating nodes: %v", err) t.Fatalf("Error creating nodes: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
if test.numRepetitions <= 0 { if test.numRepetitions <= 0 {
test.numRepetitions = 1 test.numRepetitions = 1
} }
@ -992,6 +1513,7 @@ func TestPreemptionRaces(t *testing.T) {
}) })
} }
} }
}
const ( const (
alwaysFailPlugin = "alwaysFailPlugin" alwaysFailPlugin = "alwaysFailPlugin"
@ -1126,8 +1648,9 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}, },
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) {
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{ Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName), SchedulerName: pointer.String(v1.DefaultSchedulerName),
@ -1197,6 +1720,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}) })
} }
} }
}
func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget { func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget {
intMinAvailable := intstr.FromInt32(int32(minAvailable)) intMinAvailable := intstr.FromInt32(int32(minAvailable))
@ -1405,8 +1929,9 @@ func TestPDBInPreemption(t *testing.T) {
}, },
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
for i := 1; i <= test.nodeCnt; i++ { for i := 1; i <= test.nodeCnt; i++ {
nodeName := fmt.Sprintf("node-%v", i) nodeName := fmt.Sprintf("node-%v", i)
_, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj()) _, err := createNode(cs, st.MakeNode().Name(nodeName).Capacity(defaultNodeRes).Obj())
@ -1482,6 +2007,7 @@ func TestPDBInPreemption(t *testing.T) {
}) })
} }
} }
}
func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), 0, opts...) testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
@ -1563,8 +2089,9 @@ func TestPreferNominatedNode(t *testing.T) {
}, },
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node") testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node")
cs := testCtx.ClientSet cs := testCtx.ClientSet
nsName := testCtx.NS.Name nsName := testCtx.NS.Name
@ -1611,6 +2138,7 @@ func TestPreferNominatedNode(t *testing.T) {
}) })
} }
} }
}
// TestReadWriteOncePodPreemption tests preemption scenarios for pods with // TestReadWriteOncePodPreemption tests preemption scenarios for pods with
// ReadWriteOncePod PVCs. // ReadWriteOncePod PVCs.
@ -1912,8 +2440,9 @@ func TestReadWriteOncePodPreemption(t *testing.T) {
t.Fatalf("Error creating node: %v", err) t.Fatalf("Error creating node: %v", err)
} }
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
if err := test.init(); err != nil { if err := test.init(); err != nil {
t.Fatalf("Error while initializing test: %v", err) t.Fatalf("Error while initializing test: %v", err)
} }
@ -1961,3 +2490,4 @@ func TestReadWriteOncePodPreemption(t *testing.T) {
}) })
} }
} }
}

View File

@ -358,6 +358,16 @@
SchedulerQueueingHints: false SchedulerQueueingHints: false
labels: [performance] labels: [performance]
threshold: 200 threshold: 200
featureGates:
SchedulerAsyncPreemption: false
params:
initNodes: 5000
initPods: 20000
measurePods: 5000
- name: 5000Nodes_AsyncPreemption
labels: [performance]
featureGates:
SchedulerAsyncPreemption: true
params: params:
initNodes: 5000 initNodes: 5000
initPods: 20000 initPods: 20000

View File

@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: Pod kind: Pod
metadata: metadata:
generateName: pod- generateName: pod-high-priority-
spec: spec:
priority: 10 priority: 10
containers: containers: