Merge pull request #128170 from sanposhiho/async-preemption

feature(KEP-4832): asynchronous preemption
This commit is contained in:
Kubernetes Prow Robot 2024-11-07 19:44:54 +00:00 committed by GitHub
commit fb033826a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 2048 additions and 765 deletions

View File

@ -580,6 +580,14 @@ const (
// which benefits to reduce the useless requeueing.
SchedulerQueueingHints featuregate.Feature = "SchedulerQueueingHints"
// owner: @sanposhiho
// kep: http://kep.k8s.io/4832
// alpha: v1.32
//
// Running some expensive operation within the scheduler's preemption asynchronously,
// which improves the scheduling latency when the preemption involves in.
SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption"
// owner: @atosatto @yuanchen8911
// kep: http://kep.k8s.io/3902
//

View File

@ -633,6 +633,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha},
},
SchedulerAsyncPreemption: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
SchedulerQueueingHints: {
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},

View File

@ -52,6 +52,7 @@ var ExpandedPluginsV1 = &config.Plugins{
PreEnqueue: config.PluginSet{
Enabled: []config.Plugin{
{Name: names.SchedulingGates},
{Name: names.DefaultPreemption},
},
},
QueueSort: config.PluginSet{

View File

@ -92,9 +92,11 @@ type PreEnqueueCheck func(pod *v1.Pod) bool
type SchedulingQueue interface {
framework.PodNominator
Add(logger klog.Logger, pod *v1.Pod)
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
// The passed-in pods are originally compiled from plugins that want to activate Pods,
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
// Activate moves the given pods to activeQ.
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
// the wildcard event is registered so that the pod will be requeued when it comes back.
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
// Activate would ignore the pod.
Activate(logger klog.Logger, pods map[string]*v1.Pod)
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
// The podSchedulingCycle represents the current scheduling cycle number which can be
@ -411,9 +413,22 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
}
if event.IsWildCard() {
// If the wildcard event has a Pod in newObj,
// that indicates that the event wants to be effective for the Pod only.
// Specifically, EventForceActivate could have a target Pod in newObj.
if newObj != nil {
if pod, ok := newObj.(*v1.Pod); !ok || pod.UID != pInfo.Pod.UID {
// This wildcard event is not for this Pod.
if ok {
logger.V(6).Info("Not worth requeuing because the event is wildcard, but for another pod", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "newObj", klog.KObj(pod))
}
return queueSkip
}
}
// If the wildcard event is special one as someone wants to force all Pods to move to activeQ/backoffQ.
// We return queueAfterBackoff in this case, while resetting all blocked plugins.
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod))
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod), "event", event.Label())
return queueAfterBackoff
}
@ -590,7 +605,11 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
}
}
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
// Activate moves the given pods to activeQ.
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
// the wildcard event is registered so that the pod will be requeued when it comes back.
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
// Activate would ignore the pod.
func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
@ -599,7 +618,15 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
for _, pod := range pods {
if p.activate(logger, pod) {
activated = true
continue
}
// If this pod is in-flight, register the activation event (for when QHint is enabled) or update moveRequestCycle (for when QHints is disabled)
// so that the pod will be requeued when it comes back.
// Specifically in the in-tree plugins, this is for the scenario with the preemption plugin
// where the async preemption API calls are all done or fail at some point before the Pod comes back to the queue.
p.activeQ.addEventsIfPodInFlight(nil, pod, []framework.ClusterEvent{framework.EventForceActivate})
p.moveRequestCycle = p.activeQ.schedulingCycle()
}
if activated {

View File

@ -1294,13 +1294,17 @@ func TestPriorityQueue_Delete(t *testing.T) {
}
func TestPriorityQueue_Activate(t *testing.T) {
metrics.Register()
tests := []struct {
name string
qPodInfoInUnschedulablePods []*framework.QueuedPodInfo
qPodInfoInPodBackoffQ []*framework.QueuedPodInfo
qPodInActiveQ []*v1.Pod
qPodInfoToActivate *framework.QueuedPodInfo
qPodInInFlightPod *v1.Pod
expectedInFlightEvent *clusterEvent
want []*framework.QueuedPodInfo
qHintEnabled bool
}{
{
name: "pod already in activeQ",
@ -1313,6 +1317,21 @@ func TestPriorityQueue_Activate(t *testing.T) {
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
want: []*framework.QueuedPodInfo{},
},
{
name: "[QHint] pod not in unschedulablePods/podBackoffQ but in-flight",
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
qPodInInFlightPod: highPriNominatedPodInfo.Pod,
expectedInFlightEvent: &clusterEvent{oldObj: (*v1.Pod)(nil), newObj: highPriNominatedPodInfo.Pod, event: framework.EventForceActivate},
want: []*framework.QueuedPodInfo{},
qHintEnabled: true,
},
{
name: "[QHint] pod not in unschedulablePods/podBackoffQ and not in-flight",
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
qPodInInFlightPod: medPriorityPodInfo.Pod, // different pod is in-flight
want: []*framework.QueuedPodInfo{},
qHintEnabled: true,
},
{
name: "pod in unschedulablePods",
qPodInfoInUnschedulablePods: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
@ -1329,12 +1348,30 @@ func TestPriorityQueue_Activate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.qHintEnabled)
var objs []runtime.Object
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
if tt.qPodInInFlightPod != nil {
// Put -> Pop the Pod to make it registered in inFlightPods.
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(tt.qPodInInFlightPod))
})
p, err := q.activeQ.pop(logger)
if err != nil {
t.Fatalf("Pop failed: %v", err)
}
if p.Pod.Name != tt.qPodInInFlightPod.Name {
t.Errorf("Unexpected popped pod: %v", p.Pod.Name)
}
if len(q.activeQ.listInFlightEvents()) != 1 {
t.Fatal("Expected the pod to be recorded in in-flight events, but it doesn't")
}
}
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table
for _, qPod := range tt.qPodInActiveQ {
q.Add(logger, qPod)
@ -1353,7 +1390,29 @@ func TestPriorityQueue_Activate(t *testing.T) {
// Check the result after activation by the length of activeQ
if wantLen := len(tt.want); q.activeQ.len() != wantLen {
t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.len())
t.Fatalf("length compare: want %v, got %v", wantLen, q.activeQ.len())
}
if tt.expectedInFlightEvent != nil {
if len(q.activeQ.listInFlightEvents()) != 2 {
t.Fatalf("Expected two in-flight event to be recorded, but got %v events", len(q.activeQ.listInFlightEvents()))
}
found := false
for _, e := range q.activeQ.listInFlightEvents() {
event, ok := e.(*clusterEvent)
if !ok {
continue
}
if d := cmp.Diff(tt.expectedInFlightEvent, event, cmpopts.EquateComparable(clusterEvent{})); d != "" {
t.Fatalf("Unexpected in-flight event (-want, +got):\n%s", d)
}
found = true
}
if !found {
t.Fatalf("Expected in-flight event to be recorded, but it wasn't.")
}
}
// Check if the specific pod exists in activeQ
@ -3779,6 +3838,7 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo {
// Test_isPodWorthRequeuing tests isPodWorthRequeuing function.
func Test_isPodWorthRequeuing(t *testing.T) {
metrics.Register()
count := 0
queueHintReturnQueue := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
count++
@ -3857,11 +3917,37 @@ func Test_isPodWorthRequeuing(t *testing.T) {
},
event: framework.EventUnschedulableTimeout,
oldObj: nil,
newObj: st.MakeNode().Obj(),
newObj: nil,
expected: queueAfterBackoff,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{},
},
{
name: "return Queue when the event is wildcard and the wildcard targets the pod to be requeued right now",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: framework.EventForceActivate,
oldObj: nil,
newObj: st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj(),
expected: queueAfterBackoff,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{},
},
{
name: "return Skip when the event is wildcard, but the wildcard targets a different pod",
podInfo: &framework.QueuedPodInfo{
UnschedulablePlugins: sets.New("fooPlugin1"),
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
},
event: framework.EventForceActivate,
oldObj: nil,
newObj: st.MakePod().Name("pod-different").Namespace("ns2").UID("2").Obj(),
expected: queueSkip,
expectedExecutionCount: 0,
queueingHintMap: QueueingHintMapPerProfile{},
},
{
name: "interprets Queue from the Pending plugin as queueImmediately",
podInfo: &framework.QueuedPodInfo{

View File

@ -34,6 +34,9 @@ const (
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
// to activeQ. Usually it's triggered by plugin implementations.
ForceActivate = "ForceActivate"
// UnschedulableTimeout is the event when a pod is moved from unschedulablePods
// due to the timeout specified at pod-max-in-unschedulable-pods-duration.
UnschedulableTimeout = "UnschedulableTimeout"
)
var (
@ -50,7 +53,9 @@ var (
// EventUnscheduledPodDelete is the event when an unscheduled pod is deleted.
EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete}
// EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: "UnschedulableTimeout"}
EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: UnschedulableTimeout}
// EventForceActivate is the event when a pod is moved from unschedulablePods/backoffQ to activeQ.
EventForceActivate = ClusterEvent{Resource: WildCard, ActionType: All, label: ForceActivate}
)
// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s).

View File

@ -770,6 +770,8 @@ type Framework interface {
// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
// SetPodActivator sets the PodActivator
SetPodActivator(activator PodActivator)
// Close calls Close method of each plugin.
Close() error
@ -783,6 +785,8 @@ type Handle interface {
PodNominator
// PluginsRunner abstracts operations to run some plugins.
PluginsRunner
// PodActivator abstracts operations in the scheduling queue.
PodActivator
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Permit" point.
@ -896,6 +900,16 @@ func (ni *NominatingInfo) Mode() NominatingMode {
return ni.NominatingMode
}
// PodActivator abstracts operations in the scheduling queue.
type PodActivator interface {
// Activate moves the given pods to activeQ.
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
// the wildcard event is registered so that the pod will be requeued when it comes back.
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
// Activate would ignore the pod.
Activate(logger klog.Logger, pods map[string]*v1.Pod)
}
// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominator or

View File

@ -53,9 +53,11 @@ type DefaultPreemption struct {
args config.DefaultPreemptionArgs
podLister corelisters.PodLister
pdbLister policylisters.PodDisruptionBudgetLister
Evaluator *preemption.Evaluator
}
var _ framework.PostFilterPlugin = &DefaultPreemption{}
var _ framework.PreEnqueuePlugin = &DefaultPreemption{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *DefaultPreemption) Name() string {
@ -71,13 +73,19 @@ func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feat
if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
return nil, err
}
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
pdbLister := getPDBLister(fh.SharedInformerFactory())
pl := DefaultPreemption{
fh: fh,
fts: fts,
args: *args,
podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
pdbLister: getPDBLister(fh.SharedInformerFactory()),
podLister: podLister,
pdbLister: pdbLister,
}
pl.Evaluator = preemption.NewEvaluator(Name, fh, &pl, fts.EnableAsyncPreemption)
return &pl, nil
}
@ -87,16 +95,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
metrics.PreemptionAttempts.Inc()
}()
pe := preemption.Evaluator{
PluginName: names.DefaultPreemption,
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: pl,
}
result, status := pe.Preempt(ctx, pod, m)
result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
msg := status.Message()
if len(msg) > 0 {
return result, framework.NewStatus(status.Code(), "preemption: "+msg)
@ -104,6 +103,24 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
return result, status
}
func (pl *DefaultPreemption) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
if !pl.fts.EnableAsyncPreemption {
return nil
}
if pl.Evaluator.IsPodRunningPreemption(p.GetUID()) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for the preemption for this pod to be finished")
}
return nil
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *DefaultPreemption) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
// The plugin moves the preemptor Pod to acviteQ/backoffQ once the preemption API calls are all done,
// and we don't need to move the Pod with any events.
return nil, nil
}
// calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of

View File

@ -25,6 +25,7 @@ import (
"math/rand"
"sort"
"strings"
"sync"
"testing"
"time"
@ -37,10 +38,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
@ -436,6 +439,7 @@ func TestPostFilter(t *testing.T) {
pdbLister: getPDBLister(informerFactory),
args: *getDefaultDefaultPreemptionArgs(),
}
p.Evaluator = preemption.NewEvaluator(names.DefaultPreemption, f, &p, false)
state := framework.NewCycleState()
// Ensure <state> is populated.
@ -1206,11 +1210,10 @@ func TestDryRunPreemption(t *testing.T) {
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates)
got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods
@ -1447,11 +1450,10 @@ func TestSelectBestCandidate(t *testing.T) {
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
candidates, _, _ := pe.DryRunPreemption(ctx, state, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(ctx, candidates)
if s == nil || len(s.Name()) == 0 {
return
@ -1548,7 +1550,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
})
}
}
func TestPreempt(t *testing.T) {
metrics.Register()
tests := []struct {
name string
pod *v1.Pod
@ -1713,197 +1717,235 @@ func TestPreempt(t *testing.T) {
}
labelKeys := []string{"hostname", "zone", "region"}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := clientsetfake.NewClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
podInformer.GetStore().Add(test.pod)
for i := range test.pods {
podInformer.GetStore().Add(test.pods[i])
}
deletedPodNames := sets.New[string]()
patchedPodNames := sets.New[string]()
patchedPods := []*v1.Pod{}
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
patchAction := action.(clienttesting.PatchAction)
podName := patchAction.GetName()
namespace := patchAction.GetNamespace()
patch := patchAction.GetPatch()
pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName)
if err != nil {
t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err)
}
marshalledPod, err := json.Marshal(pod)
if err != nil {
t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err)
}
updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{})
if err != nil {
t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err)
}
updatedPod := &v1.Pod{}
if err := json.Unmarshal(updated, updatedPod); err != nil {
t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err)
}
patchedPods = append(patchedPods, updatedPod)
patchedPodNames.Insert(podName)
return true, nil, nil
})
client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
return true, nil, nil
})
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
waitingPods := frameworkruntime.NewWaitingPodsMap()
cache := internalcache.New(ctx, time.Duration(0))
for _, pod := range test.pods {
cache.AddPod(logger, pod)
}
cachedNodeInfoMap := map[string]*framework.NodeInfo{}
nodes := make([]*v1.Node, len(test.nodeNames))
for i, name := range test.nodeNames {
node := st.MakeNode().Name(name).Capacity(veryLargeRes).Obj()
// Split node name by '/' to form labels in a format of
// {"hostname": node.Name[0], "zone": node.Name[1], "region": node.Name[2]}
node.ObjectMeta.Labels = make(map[string]string)
for i, label := range strings.Split(node.Name, "/") {
node.ObjectMeta.Labels[labelKeys[i]] = label
}
node.Name = node.ObjectMeta.Labels["hostname"]
cache.AddNode(logger, node)
nodes[i] = node
// Set nodeInfo to extenders to mock extenders' cache for preemption.
cachedNodeInfo := framework.NewNodeInfo()
cachedNodeInfo.SetNode(node)
cachedNodeInfoMap[node.Name] = cachedNodeInfo
}
var extenders []framework.Extender
for _, extender := range test.extenders {
// Set nodeInfoMap as extenders cached node information.
extender.CachedNodeNameToInfo = cachedNodeInfoMap
extenders = append(extenders, extender)
}
fwk, err := tf.NewFramework(
ctx,
[]tf.RegisterPluginFunc{
test.registerPlugin,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger),
)
if err != nil {
t.Fatal(err)
}
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s)
}
// Call preempt and check the expected results.
pl := DefaultPreemption{
fh: fwk,
podLister: informerFactory.Core().V1().Pods().Lister(),
pdbLister: getPDBLister(informerFactory),
args: *getDefaultDefaultPreemptionArgs(),
}
pe := preemption.Evaluator{
PluginName: names.DefaultPreemption,
Handler: pl.fh,
PodLister: pl.podLister,
PdbLister: pl.pdbLister,
State: state,
Interface: &pl,
}
// so that these nodes are eligible for preemption, we set their status
// to Unschedulable.
nodeToStatusMap := framework.NewDefaultNodeToStatus()
for _, n := range nodes {
nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable))
}
res, status := pe.Preempt(ctx, test.pod, nodeToStatusMap)
if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
}
if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" {
t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff)
}
// Make sure that the DisruptionTarget condition has been added to the pod status
for _, patchedPod := range patchedPods {
expectedPodCondition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", patchedPod.Spec.SchedulerName),
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, test := range tests {
t.Run(fmt.Sprintf("%s (Async preemption enabled: %v)", test.name, asyncPreemptionEnabled), func(t *testing.T) {
client := clientsetfake.NewClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
podInformer := informerFactory.Core().V1().Pods().Informer()
testPod := test.pod.DeepCopy()
testPods := make([]*v1.Pod, len(test.pods))
for i := range test.pods {
testPods[i] = test.pods[i].DeepCopy()
}
_, condition := apipod.GetPodCondition(&patchedPod.Status, v1.DisruptionTarget)
if diff := cmp.Diff(condition, expectedPodCondition, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" {
t.Fatalf("unexpected difference in the pod %q DisruptionTarget condition: %s", patchedPod.Name, diff)
if err := podInformer.GetStore().Add(testPod); err != nil {
t.Fatalf("Failed to add test pod %s: %v", testPod.Name, err)
}
}
for victimName := range deletedPodNames {
found := false
for _, expPod := range test.expectedPods {
if expPod == victimName {
found = true
break
for i := range testPods {
if err := podInformer.GetStore().Add(testPods[i]); err != nil {
t.Fatalf("Failed to add test pod %s: %v", testPods[i], err)
}
}
if !found {
t.Errorf("pod %v is not expected to be a victim.", victimName)
}
}
if res != nil && res.NominatingInfo != nil {
test.pod.Status.NominatedNodeName = res.NominatedNodeName
}
// Manually set the deleted Pods' deletionTimestamp to non-nil.
for _, pod := range test.pods {
if deletedPodNames.Has(pod.Name) {
now := metav1.Now()
pod.DeletionTimestamp = &now
deletedPodNames.Delete(pod.Name)
}
}
// Need to protect deletedPodNames and patchedPodNames to prevent DATA RACE panic.
var mu sync.RWMutex
deletedPodNames := sets.New[string]()
patchedPodNames := sets.New[string]()
patchedPods := []*v1.Pod{}
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
patchAction := action.(clienttesting.PatchAction)
podName := patchAction.GetName()
namespace := patchAction.GetNamespace()
patch := patchAction.GetPatch()
pod, err := informerFactory.Core().V1().Pods().Lister().Pods(namespace).Get(podName)
if err != nil {
t.Fatalf("Failed to get the original pod %s/%s before patching: %v\n", namespace, podName, err)
}
marshalledPod, err := json.Marshal(pod)
if err != nil {
t.Fatalf("Failed to marshal the original pod %s/%s: %v", namespace, podName, err)
}
updated, err := strategicpatch.StrategicMergePatch(marshalledPod, patch, v1.Pod{})
if err != nil {
t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patch, marshalledPod, err)
}
updatedPod := &v1.Pod{}
if err := json.Unmarshal(updated, updatedPod); err != nil {
t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err)
}
patchedPods = append(patchedPods, updatedPod)
mu.Lock()
defer mu.Unlock()
patchedPodNames.Insert(podName)
return true, nil, nil
})
client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
mu.Lock()
defer mu.Unlock()
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
return true, nil, nil
})
// Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(ctx, test.pod, framework.NewDefaultNodeToStatus())
if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
}
})
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
waitingPods := frameworkruntime.NewWaitingPodsMap()
cache := internalcache.New(ctx, time.Duration(0))
for _, pod := range testPods {
if err := cache.AddPod(logger, pod.DeepCopy()); err != nil {
t.Fatalf("Failed to add pod %s: %v", pod.Name, err)
}
}
cachedNodeInfoMap := map[string]*framework.NodeInfo{}
nodes := make([]*v1.Node, len(test.nodeNames))
for i, name := range test.nodeNames {
node := st.MakeNode().Name(name).Capacity(veryLargeRes).Obj()
// Split node name by '/' to form labels in a format of
// {"hostname": node.Name[0], "zone": node.Name[1], "region": node.Name[2]}
node.ObjectMeta.Labels = make(map[string]string)
for i, label := range strings.Split(node.Name, "/") {
node.ObjectMeta.Labels[labelKeys[i]] = label
}
node.Name = node.ObjectMeta.Labels["hostname"]
t.Logf("node is added: %v. labels: %#v", node.Name, node.ObjectMeta.Labels)
cache.AddNode(logger, node)
nodes[i] = node
// Set nodeInfo to extenders to mock extenders' cache for preemption.
cachedNodeInfo := framework.NewNodeInfo()
cachedNodeInfo.SetNode(node)
cachedNodeInfoMap[node.Name] = cachedNodeInfo
}
var extenders []framework.Extender
for _, extender := range test.extenders {
// Set nodeInfoMap as extenders cached node information.
extender.CachedNodeNameToInfo = cachedNodeInfoMap
extenders = append(extenders, extender)
}
fwk, err := tf.NewFramework(
ctx,
[]tf.RegisterPluginFunc{
test.registerPlugin,
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(testPods, nodes)),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithPodActivator(&fakePodActivator{}),
)
if err != nil {
t.Fatal(err)
}
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, testPod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s)
}
// Call preempt and check the expected results.
pl := DefaultPreemption{
fh: fwk,
podLister: informerFactory.Core().V1().Pods().Lister(),
pdbLister: getPDBLister(informerFactory),
args: *getDefaultDefaultPreemptionArgs(),
}
pe := preemption.NewEvaluator(names.DefaultPreemption, pl.fh, &pl, asyncPreemptionEnabled)
// so that these nodes are eligible for preemption, we set their status
// to Unschedulable.
nodeToStatusMap := framework.NewDefaultNodeToStatus()
for _, n := range nodes {
nodeToStatusMap.Set(n.Name, framework.NewStatus(framework.Unschedulable))
}
res, status := pe.Preempt(ctx, state, testPod, nodeToStatusMap)
if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if diff := cmp.Diff(test.want, res); diff != "" {
t.Errorf("Unexpected status (-want, +got):\n%s", diff)
}
if asyncPreemptionEnabled {
// Wait for the pod to be deleted.
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
mu.RLock()
defer mu.RUnlock()
return len(deletedPodNames) == len(test.expectedPods), nil
}); err != nil {
t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames))
}
} else {
mu.RLock()
// If async preemption is disabled, the pod should be deleted immediately.
if len(deletedPodNames) != len(test.expectedPods) {
t.Errorf("expected %v pods to be deleted, got %v.", len(test.expectedPods), len(deletedPodNames))
}
mu.RUnlock()
}
mu.RLock()
if diff := cmp.Diff(sets.List(patchedPodNames), sets.List(deletedPodNames)); diff != "" {
t.Errorf("unexpected difference in the set of patched and deleted pods: %s", diff)
}
// Make sure that the DisruptionTarget condition has been added to the pod status
for _, patchedPod := range patchedPods {
expectedPodCondition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", patchedPod.Spec.SchedulerName),
}
_, condition := apipod.GetPodCondition(&patchedPod.Status, v1.DisruptionTarget)
if diff := cmp.Diff(condition, expectedPodCondition, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" {
t.Fatalf("unexpected difference in the pod %q DisruptionTarget condition: %s", patchedPod.Name, diff)
}
}
for victimName := range deletedPodNames {
found := false
for _, expPod := range test.expectedPods {
if expPod == victimName {
found = true
break
}
}
if !found {
t.Errorf("pod %v is not expected to be a victim.", victimName)
}
}
if res != nil && res.NominatingInfo != nil {
testPod.Status.NominatedNodeName = res.NominatedNodeName
}
// Manually set the deleted Pods' deletionTimestamp to non-nil.
for _, pod := range testPods {
if deletedPodNames.Has(pod.Name) {
now := metav1.Now()
pod.DeletionTimestamp = &now
deletedPodNames.Delete(pod.Name)
}
}
mu.RUnlock()
// Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(ctx, state, testPod, framework.NewDefaultNodeToStatus())
if !status.IsSuccess() && !status.IsRejected() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
if res != nil && res.NominatingInfo != nil && len(deletedPodNames) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", res.NominatedNodeName)
}
})
}
}
}
type fakePodActivator struct {
}
func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {}

View File

@ -28,4 +28,5 @@ type Features struct {
EnableInPlacePodVerticalScaling bool
EnableSidecarContainers bool
EnableSchedulingQueueHint bool
EnableAsyncPreemption bool
}

View File

@ -54,6 +54,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers),
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
}
registry := runtime.Registry{

View File

@ -23,11 +23,15 @@ import (
"math"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
@ -36,6 +40,7 @@ import (
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -125,10 +130,88 @@ type Evaluator struct {
Handler framework.Handle
PodLister corelisters.PodLister
PdbLister policylisters.PodDisruptionBudgetLister
State *framework.CycleState
enableAsyncPreemption bool
mu sync.RWMutex
// preempting is a set that records the pods that are currently triggering preemption asynchronously,
// which is used to prevent the pods from entering the scheduling cycle meanwhile.
preempting sets.Set[types.UID]
// PreemptPod is a function that actually makes API calls to preempt a specific Pod.
// This is exposed to be replaced during tests.
PreemptPod func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error
Interface
}
func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsyncPreemption bool) *Evaluator {
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
pdbLister := fh.SharedInformerFactory().Policy().V1().PodDisruptionBudgets().Lister()
ev := &Evaluator{
PluginName: names.DefaultPreemption,
Handler: fh,
PodLister: podLister,
PdbLister: pdbLister,
Interface: i,
enableAsyncPreemption: enableAsyncPreemption,
preempting: sets.New[types.UID](),
}
// PreemptPod actually makes API calls to preempt a specific Pod.
//
// We implement it here directly, rather than creating a separate method like ev.preemptPod(...)
// to prevent the misuse of the PreemptPod function.
ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
logger := klog.FromContext(ctx)
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", preemptor.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, ev.Handler.ClientSet(), victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
ev.Handler.EventRecorder().Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", preemptor.UID, c.Name())
return nil
}
return ev
}
// IsPodRunningPreemption returns true if the pod is currently triggering preemption asynchronously.
func (ev *Evaluator) IsPodRunningPreemption(podUID types.UID) bool {
ev.mu.RLock()
defer ev.mu.RUnlock()
return ev.preempting.Has(podUID)
}
// Preempt returns a PostFilterResult carrying suggested nominatedNodeName, along with a Status.
// The semantics of returned <PostFilterResult, Status> varies on different scenarios:
//
@ -145,7 +228,7 @@ type Evaluator struct {
//
// - <non-nil PostFilterResult, Success>. It's the regular happy path
// and the non-empty nominatedNodeName will be applied to the preemptor pod.
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusReader) (*framework.PostFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)
// 0) Fetch the latest version of <pod>.
@ -171,7 +254,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
if err != nil {
return nil, framework.AsStatus(err)
}
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, allNodes, pod, m)
candidates, nodeToStatusMap, err := ev.findCandidates(ctx, state, allNodes, pod, m)
if err != nil && len(candidates) == 0 {
return nil, framework.AsStatus(err)
}
@ -203,9 +286,15 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
}
logger.V(2).Info("the target node for the preemption is determined", "node", bestCandidate.Name(), "pod", klog.KObj(pod))
// 5) Perform preparation work before nominating the selected candidate.
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
if ev.enableAsyncPreemption {
ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName)
} else {
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
}
return framework.NewPostFilterResultWithNominatedNode(bestCandidate.Name()), framework.NewStatus(framework.Success)
@ -213,7 +302,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable.
func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
func (ev *Evaluator) findCandidates(ctx context.Context, state *framework.CycleState, allNodes []*framework.NodeInfo, pod *v1.Pod, m framework.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
if len(allNodes) == 0 {
return nil, nil, errors.New("no nodes available")
}
@ -239,7 +328,7 @@ func (ev *Evaluator) findCandidates(ctx context.Context, allNodes []*framework.N
}
offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
return ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, candidatesNum)
return ev.DryRunPreemption(ctx, state, pod, potentialNodes, pdbs, offset, candidatesNum)
}
// callExtenders calls given <extenders> to select the list of feasible candidates.
@ -347,41 +436,11 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
defer cancel()
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
}
newStatus := victim.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(pod), "victim", klog.KObj(victim), "node", c.Name())
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by pod %v on node %v", pod.UID, c.Name())
}
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
}, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
return framework.AsStatus(err)
}
@ -401,6 +460,91 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
return nil
}
// prepareCandidateAsync triggers a goroutine for some preparation work:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
// The Pod won't be retried until the goroutine triggered here completes.
//
// See http://kep.k8s.io/4832 for how the async preemption works.
func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) {
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
// because this process could continue even after this scheduling cycle finishes.
ctx, cancel := context.WithCancel(context.Background())
errCh := parallelize.NewErrorChannel()
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
}
ev.mu.Lock()
ev.preempting.Insert(pod.UID)
ev.mu.Unlock()
logger := klog.FromContext(ctx)
go func() {
startTime := time.Now()
result := metrics.GoroutineResultSuccess
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() {
if result == metrics.GoroutineResultError {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}()
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil {
logger.Error(err, "Cannot clear 'NominatedNodeName' field from lower priority pods on the same target node", "node", c.Name())
result = metrics.GoroutineResultError
// We do not return as this error is not critical.
}
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
}()
}
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
if pdbLister != nil {
return pdbLister.List(labels.Everything())
@ -538,7 +682,7 @@ func getLowerPriorityNominatedPods(logger klog.Logger, pn framework.PodNominator
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
func (ev *Evaluator) DryRunPreemption(ctx context.Context, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget, offset int32, candidatesNum int32) ([]Candidate, *framework.NodeToStatus, error) {
fh := ev.Handler
@ -557,7 +701,7 @@ func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentia
nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot()
logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name)
stateCopy := ev.State.Clone()
stateCopy := state.Clone()
pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
if status.IsSuccess() && len(pods) != 0 {
victims := extenderv1.Victims{

View File

@ -20,18 +20,26 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
@ -81,6 +89,19 @@ func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVi
return nil
}
type fakePodActivator struct {
activatedPods map[string]*v1.Pod
mu *sync.RWMutex
}
func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
f.mu.Lock()
defer f.mu.Unlock()
for name, pod := range pods {
f.activatedPods[name] = pod
}
}
type FakePreemptionScorePostFilterPlugin struct{}
func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
@ -243,9 +264,8 @@ func TestDryRunPreemption(t *testing.T) {
PluginName: "FakePostFilter",
Handler: fwk,
Interface: fakePostPlugin,
State: state,
}
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods
@ -344,9 +364,8 @@ func TestSelectCandidate(t *testing.T) {
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: state,
}
candidates, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(ctx, state, pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
s := pe.SelectCandidate(ctx, candidates)
if s == nil || len(s.Name()) == 0 {
t.Errorf("expect any node in %v, but no candidate selected", tt.expected)
@ -393,6 +412,11 @@ func TestPrepareCandidate(t *testing.T) {
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
victim2 = st.MakePod().Name("victim2").UID("victim2").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(50000).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@ -404,6 +428,12 @@ func TestPrepareCandidate(t *testing.T) {
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
failVictim1WithMatchingCondition = st.MakePod().Name("fail-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Conditions([]v1.PodCondition{condition}).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
preemptor = st.MakePod().Name("preemptor").UID("preemptor").
SchedulerName(defaultSchedulerName).Priority(highPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@ -411,15 +441,23 @@ func TestPrepareCandidate(t *testing.T) {
)
tests := []struct {
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
expectedDeletedPods []string
expectedDeletionError bool
expectedPatchError bool
// Only compared when async preemption is disabled.
expectedStatus *framework.Status
// Only compared when async preemption is enabled.
expectedPreemptingMap sets.Set[types.UID]
expectedActivatedPods map[string]*v1.Pod
}{
{
name: "no victims",
candidate: &fakeCandidate{
victims: &extenderv1.Victims{},
},
@ -427,11 +465,13 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim without condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -444,11 +484,14 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim with same condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -461,11 +504,14 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1WithMatchingCondition,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but patch pod failed (not found victim1 pod)",
name: "one victim, not-found victim error is ignored when patching",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -474,13 +520,35 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but delete pod failed (not found victim1 pod)",
name: "one victim, but pod deletion failed",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
failVictim1WithMatchingCondition,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
expectedDeletionError: true,
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
{
name: "one victim, not-found victim error is ignored when deleting",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -489,18 +557,40 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)",
name: "one victim, but patch pod failed",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1,
failVictim,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
expectedPatchError: true,
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
{
name: "two victims without condition, one passes successfully and the second fails",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
failVictim,
victim2,
},
},
@ -509,70 +599,157 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
nodeNames: []string{node1Name},
expectedPatchError: true,
expectedDeletedPods: []string{"victim2"},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, tt := range tests {
t.Run(fmt.Sprintf("%v (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
cs := clientsetfake.NewClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := Evaluator{
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: framework.NewCycleState(),
}
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
if tt.expectedStatus == nil {
if status != nil {
t.Errorf("expect nil status, but got %v", status)
requestStopper := make(chan struct{})
mu := &sync.RWMutex{}
deletedPods := sets.New[string]()
deletionFailure := false // whether any request to delete pod failed
patchFailure := false // whether any request to patch pod status failed
cs := clientsetfake.NewClientset(objs...)
cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
mu.Lock()
defer mu.Unlock()
name := action.(clienttesting.DeleteAction).GetName()
if name == "fail-victim" {
deletionFailure = true
return true, nil, fmt.Errorf("delete pod failed")
}
deletedPods.Insert(name)
return true, nil, nil
})
cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
mu.Lock()
defer mu.Unlock()
if action.(clienttesting.PatchAction).GetName() == "fail-victim" {
patchFailure = true
return true, nil, fmt.Errorf("patch pod status failed")
}
return true, nil, nil
})
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod), mu: mu}
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
frameworkruntime.WithPodActivator(fakeActivator),
)
if err != nil {
t.Fatal(err)
}
} else {
if status == nil {
t.Errorf("expect status %v, but got nil", tt.expectedStatus)
} else if status.Code() != tt.expectedStatus.Code() {
t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code())
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, asyncPreemptionEnabled)
if asyncPreemptionEnabled {
pe.prepareCandidateAsync(tt.candidate, tt.preemptor, "test-plugin")
pe.mu.Lock()
// The preempting map should be registered synchronously
// so we don't need wait.Poll.
if !tt.expectedPreemptingMap.Equal(pe.preempting) {
t.Errorf("expected preempting map %v, got %v", tt.expectedPreemptingMap, pe.preempting)
close(requestStopper)
pe.mu.Unlock()
return
}
pe.mu.Unlock()
// make the requests complete
close(requestStopper)
} else {
close(requestStopper) // no need to stop requests
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
if tt.expectedStatus == nil {
if status != nil {
t.Errorf("expect nil status, but got %v", status)
}
} else {
if status == nil {
t.Errorf("expect status %v, but got nil", tt.expectedStatus)
} else if status.Code() != tt.expectedStatus.Code() {
t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code())
} else if status.Message() != tt.expectedStatus.Message() {
t.Errorf("expect status message %v, but got %v", tt.expectedStatus.Message(), status.Message())
}
}
}
}
})
var lastErrMsg string
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
mu.RLock()
defer mu.RUnlock()
if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) {
lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList())
return false, nil
}
if tt.expectedDeletionError != deletionFailure {
lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure)
return false, nil
}
if tt.expectedPatchError != patchFailure {
lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure)
return false, nil
}
if asyncPreemptionEnabled {
if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) {
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods)
return false, nil
}
if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 {
lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatal(lastErrMsg)
}
})
}
}
}
@ -812,7 +989,6 @@ func TestCallExtenders(t *testing.T) {
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: framework.NewCycleState(),
}
gotCandidates, status := pe.callExtenders(logger, preemptor, tt.candidates)
if (tt.wantStatus == nil) != (status == nil) || status.Code() != tt.wantStatus.Code() {

View File

@ -84,6 +84,7 @@ type frameworkImpl struct {
extenders []framework.Extender
framework.PodNominator
framework.PodActivator
parallelizer parallelize.Parallelizer
}
@ -131,6 +132,7 @@ type frameworkOptions struct {
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
podActivator framework.PodActivator
extenders []framework.Extender
captureProfile CaptureProfile
parallelizer parallelize.Parallelizer
@ -200,6 +202,12 @@ func WithPodNominator(nominator framework.PodNominator) Option {
}
}
func WithPodActivator(activator framework.PodActivator) Option {
return func(o *frameworkOptions) {
o.podActivator = activator
}
}
// WithExtenders sets extenders for the scheduling frameworkImpl.
func WithExtenders(extenders []framework.Extender) Option {
return func(o *frameworkOptions) {
@ -279,6 +287,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
PodActivator: options.podActivator,
parallelizer: options.parallelizer,
logger: logger,
}
@ -427,6 +436,10 @@ func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
f.PodNominator = n
}
func (f *frameworkImpl) SetPodActivator(a framework.PodActivator) {
f.PodActivator = a
}
// Close closes each plugin, when they implement io.Closer interface.
func (f *frameworkImpl) Close() error {
var errs []error

View File

@ -296,7 +296,7 @@ func (ce ClusterEvent) Label() string {
// AllClusterEventLabels returns all possible cluster event labels given to the metrics.
func AllClusterEventLabels() []string {
labels := []string{EventUnschedulableTimeout.Label()}
labels := []string{UnschedulableTimeout, ForceActivate}
for _, r := range allResources {
for _, a := range basicActionTypes {
labels = append(labels, ClusterEvent{Resource: r, ActionType: a}.Label())

View File

@ -40,6 +40,11 @@ const (
Binding = "binding"
)
const (
GoroutineResultSuccess = "success"
GoroutineResultError = "error"
)
// ExtentionPoints is a list of possible values for the extension_point label.
var ExtentionPoints = []string{
PreFilter,
@ -105,14 +110,21 @@ var (
FrameworkExtensionPointDuration *metrics.HistogramVec
PluginExecutionDuration *metrics.HistogramVec
// This is only available when the QHint feature gate is enabled.
PermitWaitDuration *metrics.HistogramVec
CacheSize *metrics.GaugeVec
unschedulableReasons *metrics.GaugeVec
PluginEvaluationTotal *metrics.CounterVec
// The below two are only available when the QHint feature gate is enabled.
queueingHintExecutionDuration *metrics.HistogramVec
SchedulerQueueIncomingPods *metrics.CounterVec
PermitWaitDuration *metrics.HistogramVec
CacheSize *metrics.GaugeVec
unschedulableReasons *metrics.GaugeVec
PluginEvaluationTotal *metrics.CounterVec
metricsList []metrics.Registerable
// The below two are only available when the async-preemption feature gate is enabled.
PreemptionGoroutinesDuration *metrics.HistogramVec
PreemptionGoroutinesExecutionTotal *metrics.CounterVec
// metricsList is a list of all metrics that should be registered always, regardless of any feature gate's value.
metricsList []metrics.Registerable
)
var registerMetrics sync.Once
@ -123,11 +135,14 @@ func Register() {
registerMetrics.Do(func() {
InitMetrics()
RegisterMetrics(metricsList...)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
RegisterMetrics(queueingHintExecutionDuration)
RegisterMetrics(InFlightEvents)
}
volumebindingmetrics.RegisterVolumeSchedulingMetrics()
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
RegisterMetrics(queueingHintExecutionDuration, InFlightEvents)
}
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption) {
RegisterMetrics(PreemptionGoroutinesDuration, PreemptionGoroutinesExecutionTotal)
}
})
}
@ -317,6 +332,25 @@ func InitMetrics() {
StabilityLevel: metrics.ALPHA,
}, []string{"plugin", "extension_point", "profile"})
PreemptionGoroutinesDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "preemption_goroutines_duration_seconds",
Help: "Duration in seconds for running goroutines for the preemption.",
Buckets: metrics.ExponentialBuckets(0.01, 2, 20),
StabilityLevel: metrics.ALPHA,
},
[]string{"result"})
PreemptionGoroutinesExecutionTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "preemption_goroutines_execution_total",
Help: "Number of preemption goroutines executed.",
StabilityLevel: metrics.ALPHA,
},
[]string{"result"})
metricsList = []metrics.Registerable{
scheduleAttempts,
schedulingLatency,

View File

@ -355,6 +355,7 @@ func New(ctx context.Context,
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
fwk.SetPodActivator(podQueue)
}
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)

View File

@ -310,6 +310,12 @@ func (p *PodWrapper) Name(s string) *PodWrapper {
return p
}
// Name sets `s` as the name of the inner pod.
func (p *PodWrapper) GenerateName(s string) *PodWrapper {
p.SetGenerateName(s)
return p
}
// UID sets `s` as the UID of the inner pod.
func (p *PodWrapper) UID(s string) *PodWrapper {
p.SetUID(types.UID(s))

View File

@ -312,6 +312,10 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
RegularResourceUsageTracking = framework.WithFeature(framework.ValidFeatures.Add("RegularResourceUsageTracking"))
// Owner: sig-scheduling
// Marks tests of the asynchronous preemption (KEP-4832) that require the `SchedulerAsyncPreemption` feature gate.
SchedulerAsyncPreemption = framework.WithFeature(framework.ValidFeatures.Add("SchedulerAsyncPreemption"))
// Owner: sig-network
// Marks tests that require a pod networking implementation that supports SCTP
// traffic between pods.

View File

@ -43,6 +43,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -305,6 +306,157 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
}
})
/*
Release: v1.32
Testname: Scheduler runs the preemption with various priority classes expectedly
Description: When there are Pods with various priority classes running the preemption,
the scheduler must prioritize the Pods with the higher priority class.
*/
framework.It("validates various priority Pods preempt expectedly with the async preemption", feature.SchedulerAsyncPreemption, func(ctx context.Context) {
var podRes v1.ResourceList
// Create 10 pods per node that will eat up all the node's resources.
ginkgo.By("Create 10 low-priority pods on each node.")
lowPriorityPods := make([]*v1.Pod, 0, 10*len(nodeList.Items))
// Create pods in the cluster.
for i, node := range nodeList.Items {
// Update each node to advertise 3 available extended resources
e2enode.AddExtendedResource(ctx, cs, node.Name, testExtendedResource, resource.MustParse("10"))
// Create 10 low priority pods on each node, which will use up 10/10 of the node's resources.
for j := 0; j < 10; j++ {
// Request 1 of the available resources for the victim pods
podRes = v1.ResourceList{}
podRes[testExtendedResource] = resource.MustParse("1")
pausePod := createPausePod(ctx, f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%d-%v", i, j, lowPriorityClassName),
PriorityClassName: lowPriorityClassName,
// This victim pod will be preempted by the high priority pod.
// But, the deletion will be blocked by the finalizer.
//
// The finalizer is needed to prevent the medium Pods from being scheduled instead of the high Pods,
// depending on when the scheduler notices the existence of all the high Pods we create.
Finalizers: []string{testFinalizer},
Resources: &v1.ResourceRequirements{
Requests: podRes,
Limits: podRes,
},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
},
},
},
},
},
},
})
lowPriorityPods = append(lowPriorityPods, pausePod)
framework.Logf("Created pod: %v", pausePod.Name)
}
}
ginkgo.By("Wait for lower priority pods to be scheduled.")
for _, pod := range lowPriorityPods {
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod))
}
highPriorityPods := make([]*v1.Pod, 0, 5*len(nodeList.Items))
mediumPriorityPods := make([]*v1.Pod, 0, 10*len(nodeList.Items))
ginkgo.By("Run high/medium priority pods that have same requirements as that of lower priority pod")
for i := range nodeList.Items {
// Create medium priority pods first
// to confirm the scheduler finally prioritize the high priority pods, ignoring the medium priority pods.
for j := 0; j < 10; j++ {
// 5 pods per node will be unschedulable
// because the node only has 10 resource, and high priority pods will use 5 resource.
p := createPausePod(ctx, f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%d-%v", i, j, mediumPriorityClassName),
PriorityClassName: mediumPriorityClassName,
Resources: &v1.ResourceRequirements{
// Set the pod request to the low priority pod's resources
Requests: lowPriorityPods[0].Spec.Containers[0].Resources.Requests,
Limits: lowPriorityPods[0].Spec.Containers[0].Resources.Requests,
},
})
mediumPriorityPods = append(mediumPriorityPods, p)
}
for j := 0; j < 5; j++ {
p := createPausePod(ctx, f, pausePodConfig{
Name: fmt.Sprintf("pod%d-%d-%v", i, j, highPriorityClassName),
PriorityClassName: highPriorityClassName,
Resources: &v1.ResourceRequirements{
// Set the pod request to the low priority pod's resources
Requests: lowPriorityPods[0].Spec.Containers[0].Resources.Requests,
Limits: lowPriorityPods[0].Spec.Containers[0].Resources.Requests,
},
})
highPriorityPods = append(highPriorityPods, p)
}
}
// All low priority Pods should be the target of preemption.
// Those Pods have a finalizer and hence should not be deleted yet at this point.
ginkgo.By("Check all low priority pods to be about to preempted.")
for _, pod := range lowPriorityPods {
framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) {
preemptedPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return preemptedPod.DeletionTimestamp != nil, nil
}))
}
// All high priority Pods should be schedulable by removing the low priority Pods.
ginkgo.By("Wait for high priority pods to be ready for the preemption.")
for _, pod := range highPriorityPods {
framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) {
highPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return highPod.Status.NominatedNodeName != "", nil
}))
}
ginkgo.By("Remove the finalizer from all low priority pods to proceed the preemption.")
for _, pod := range lowPriorityPods {
// Remove the finalizer so that the pod can be deleted by GC
e2epod.NewPodClient(f).RemoveFinalizer(ctx, pod.Name, testFinalizer)
}
ginkgo.By("Wait for high priority pods to be scheduled.")
for _, pod := range highPriorityPods {
framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod))
}
ginkgo.By("Wait for 5 medium priority pods to be scheduled.")
framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, time.Second, framework.PodStartTimeout, false, func(ctx context.Context) (bool, error) {
scheduled := 0
for _, pod := range mediumPriorityPods {
medPod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if medPod.Spec.NodeName != "" {
scheduled++
}
}
if scheduled > 5 {
return false, fmt.Errorf("expected 5 medium priority pods to be scheduled, but got %d", scheduled)
}
return scheduled == 5, nil
}))
})
/*
Release: v1.31
Testname: Verify the DisruptionTarget condition is added to the preempted pod

View File

@ -1058,6 +1058,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.29"
- name: SchedulerAsyncPreemption
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: SchedulerQueueingHints
versionedSpecs:
- default: false

File diff suppressed because it is too large Load Diff

View File

@ -354,10 +354,20 @@
initPods: 2000
measurePods: 500
- name: 5000Nodes
featureGates:
SchedulerQueueingHints: false
labels: [performance]
threshold: 200
featureGates:
SchedulerQueueingHints: false
SchedulerAsyncPreemption: false
params:
initNodes: 5000
initPods: 20000
measurePods: 5000
- name: 5000Nodes_AsyncPreemptionEnabled
threshold: 200
labels: [performance]
featureGates:
SchedulerAsyncPreemption: true
params:
initNodes: 5000
initPods: 20000

View File

@ -1,7 +1,7 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-
generateName: pod-high-priority-
spec:
priority: 10
containers: