fix: use Activate always

This commit is contained in:
Kensei Nakada 2024-11-05 22:21:26 +09:00
parent 02459ca59c
commit a95b8b5085
6 changed files with 31 additions and 29 deletions

View File

@ -34,6 +34,9 @@ const (
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ // ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
// to activeQ. Usually it's triggered by plugin implementations. // to activeQ. Usually it's triggered by plugin implementations.
ForceActivate = "ForceActivate" ForceActivate = "ForceActivate"
// UnschedulableTimeout is the event when a pod is moved from unschedulablePods
// due to the timeout specified at pod-max-in-unschedulable-pods-duration.
UnschedulableTimeout = "UnschedulableTimeout"
) )
var ( var (
@ -50,7 +53,9 @@ var (
// EventUnscheduledPodDelete is the event when an unscheduled pod is deleted. // EventUnscheduledPodDelete is the event when an unscheduled pod is deleted.
EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete} EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete}
// EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout. // EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: "UnschedulableTimeout"} EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: UnschedulableTimeout}
// EventForceActivate is the event when a pod is moved from unschedulablePods/backoffQ to activeQ.
EventForceActivate = ClusterEvent{Resource: WildCard, ActionType: All, label: ForceActivate}
) )
// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s). // PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s).

View File

@ -902,7 +902,11 @@ func (ni *NominatingInfo) Mode() NominatingMode {
// PodActivator abstracts operations in the scheduling queue. // PodActivator abstracts operations in the scheduling queue.
type PodActivator interface { type PodActivator interface {
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // Activate moves the given pods to activeQ.
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
// the wildcard event is registered so that the pod will be requeued when it comes back.
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
// Activate would ignore the pod.
Activate(logger klog.Logger, pods map[string]*v1.Pod) Activate(logger klog.Logger, pods map[string]*v1.Pod)
} }

View File

@ -43,6 +43,7 @@ import (
clientsetfake "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1"
@ -1549,6 +1550,7 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
}) })
} }
} }
func TestPreempt(t *testing.T) { func TestPreempt(t *testing.T) {
metrics.Register() metrics.Register()
tests := []struct { tests := []struct {
@ -1829,6 +1831,7 @@ func TestPreempt(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(waitingPods), frameworkruntime.WithWaitingPods(waitingPods),
frameworkruntime.WithLogger(logger), frameworkruntime.WithLogger(logger),
frameworkruntime.WithPodActivator(&fakePodActivator{}),
) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1941,3 +1944,8 @@ func TestPreempt(t *testing.T) {
} }
} }
} }
type fakePodActivator struct {
}
func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {}

View File

@ -493,11 +493,10 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() { defer func() {
if result == metrics.GoroutineResultError { ev.mu.Lock()
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. delete(ev.preempting, pod.UID)
// So, we should move the Pod to the activeQ anyways. ev.mu.Unlock()
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}() }()
defer cancel() defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
@ -508,40 +507,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// lets scheduler find another place for them. // lets scheduler find another place for them.
nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name()) nominatedPods := getLowerPriorityNominatedPods(logger, ev.Handler, pod, c.Name())
if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil { if err := util.ClearNominatedNodeName(ctx, ev.Handler.ClientSet(), nominatedPods...); err != nil {
logger.Error(err, "Cannot clear 'NominatedNodeName' field") logger.Error(err, "Cannot clear 'NominatedNodeName' field from lower priority pods on the same target node", "node", c.Name())
result = metrics.GoroutineResultError result = metrics.GoroutineResultError
// We do not return as this error is not critical. // We do not return as this error is not critical.
} }
if len(c.Victims().Pods) == 0 { if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return return
} }
// We can evict all victims in parallel, but the last one. ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {
logger.Error(err, "Error occurred during async preemption") logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError result = metrics.GoroutineResultError
} }
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
}() }()
} }

View File

@ -468,6 +468,7 @@ func TestPrepareCandidate(t *testing.T) {
nodeNames: []string{node1Name}, nodeNames: []string{node1Name},
expectedStatus: nil, expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
}, },
{ {
name: "one victim without condition", name: "one victim without condition",
@ -488,6 +489,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedDeletedPods: []string{"victim1"}, expectedDeletedPods: []string{"victim1"},
expectedStatus: nil, expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
}, },
{ {
name: "one victim with same condition", name: "one victim with same condition",
@ -508,6 +510,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedDeletedPods: []string{"victim1"}, expectedDeletedPods: []string{"victim1"},
expectedStatus: nil, expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
}, },
{ {
name: "one victim, not-found victim error is ignored when patching", name: "one victim, not-found victim error is ignored when patching",
@ -526,6 +529,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedDeletedPods: []string{"victim1"}, expectedDeletedPods: []string{"victim1"},
expectedStatus: nil, expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
}, },
{ {
name: "one victim, but pod deletion failed", name: "one victim, but pod deletion failed",
@ -563,6 +567,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedDeletedPods: []string{"victim1"}, expectedDeletedPods: []string{"victim1"},
expectedStatus: nil, expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
}, },
{ {
name: "one victim, but patch pod failed", name: "one victim, but patch pod failed",

View File

@ -296,7 +296,7 @@ func (ce ClusterEvent) Label() string {
// AllClusterEventLabels returns all possible cluster event labels given to the metrics. // AllClusterEventLabels returns all possible cluster event labels given to the metrics.
func AllClusterEventLabels() []string { func AllClusterEventLabels() []string {
labels := []string{EventUnschedulableTimeout.Label()} labels := []string{UnschedulableTimeout, ForceActivate}
for _, r := range allResources { for _, r := range allResources {
for _, a := range basicActionTypes { for _, a := range basicActionTypes {
labels = append(labels, ClusterEvent{Resource: r, ActionType: a}.Label()) labels = append(labels, ClusterEvent{Resource: r, ActionType: a}.Label())