implement PodActivator to activate when preemption fails

This commit is contained in:
Kensei Nakada 2024-11-05 00:14:54 +09:00
parent 8f2243fe74
commit c322294883
5 changed files with 69 additions and 11 deletions

View File

@ -770,6 +770,8 @@ type Framework interface {
// SetPodNominator sets the PodNominator
SetPodNominator(nominator PodNominator)
// SetPodActivator sets the PodActivator
SetPodActivator(activator PodActivator)
// Close calls Close method of each plugin.
Close() error
@ -783,6 +785,8 @@ type Handle interface {
PodNominator
// PluginsRunner abstracts operations to run some plugins.
PluginsRunner
// PodActivator abstracts operations in the scheduling queue.
PodActivator
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Permit" point.
@ -896,6 +900,12 @@ func (ni *NominatingInfo) Mode() NominatingMode {
return ni.NominatingMode
}
// PodActivator abstracts operations in the scheduling queue.
type PodActivator interface {
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
Activate(logger klog.Logger, pods map[string]*v1.Pod)
}
// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {
// AddNominatedPod adds the given pod to the nominator or

View File

@ -492,6 +492,13 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
result := metrics.GoroutineResultSuccess
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() {
if result == metrics.GoroutineResultError {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ anyways.
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}()
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
@ -506,13 +513,6 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// We do not return as this error is not critical.
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
@ -521,9 +521,15 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
return
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
logger.Error(err, "Error occurred during preemption")
logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
@ -532,7 +538,7 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
ev.mu.Unlock()
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
logger.Error(err, "Error occurred during preemption")
logger.Error(err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"testing"
"time"
@ -37,6 +38,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
@ -86,6 +88,16 @@ func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVi
return nil
}
type fakePodActivator struct {
activatedPods map[string]*v1.Pod
}
func (f *fakePodActivator) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
for name, pod := range pods {
f.activatedPods[name] = pod
}
}
type FakePreemptionScorePostFilterPlugin struct{}
func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
@ -437,6 +449,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedStatus *framework.Status
// Only compared when async preemption is enabled.
expectedPreemptingMap sets.Set[types.UID]
expectedActivatedPods map[string]*v1.Pod
}{
{
name: "no victims",
@ -527,6 +540,7 @@ func TestPrepareCandidate(t *testing.T) {
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
{
name: "one victim, not-found victim error is ignored when deleting",
@ -563,6 +577,7 @@ func TestPrepareCandidate(t *testing.T) {
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
{
name: "two victims without condition, one passes successfully and the second fails",
@ -585,6 +600,7 @@ func TestPrepareCandidate(t *testing.T) {
expectedDeletedPods: []string{"victim2"},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
},
}
@ -638,6 +654,7 @@ func TestPrepareCandidate(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod)}
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
@ -648,6 +665,7 @@ func TestPrepareCandidate(t *testing.T) {
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
frameworkruntime.WithPodActivator(fakeActivator),
)
if err != nil {
t.Fatal(err)
@ -671,8 +689,6 @@ func TestPrepareCandidate(t *testing.T) {
pe.mu.Unlock()
// make the requests complete
close(requestStopper)
return
} else {
close(requestStopper) // no need to stop requests
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
@ -705,6 +721,18 @@ func TestPrepareCandidate(t *testing.T) {
lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure)
return false, nil
}
if asyncPreemptionEnabled {
if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) {
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods)
return false, nil
}
if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 {
lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods)
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatal(lastErrMsg)

View File

@ -84,6 +84,7 @@ type frameworkImpl struct {
extenders []framework.Extender
framework.PodNominator
framework.PodActivator
parallelizer parallelize.Parallelizer
}
@ -131,6 +132,7 @@ type frameworkOptions struct {
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
podActivator framework.PodActivator
extenders []framework.Extender
captureProfile CaptureProfile
parallelizer parallelize.Parallelizer
@ -200,6 +202,12 @@ func WithPodNominator(nominator framework.PodNominator) Option {
}
}
func WithPodActivator(activator framework.PodActivator) Option {
return func(o *frameworkOptions) {
o.podActivator = activator
}
}
// WithExtenders sets extenders for the scheduling frameworkImpl.
func WithExtenders(extenders []framework.Extender) Option {
return func(o *frameworkOptions) {
@ -279,6 +287,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
PodActivator: options.podActivator,
parallelizer: options.parallelizer,
logger: logger,
}
@ -427,6 +436,10 @@ func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
f.PodNominator = n
}
func (f *frameworkImpl) SetPodActivator(a framework.PodActivator) {
f.PodActivator = a
}
// Close closes each plugin, when they implement io.Closer interface.
func (f *frameworkImpl) Close() error {
var errs []error

View File

@ -355,6 +355,7 @@ func New(ctx context.Context,
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
fwk.SetPodActivator(podQueue)
}
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)