mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-26 12:46:06 +00:00
Refactor scheduler preempt interface
- replace error with NodeToStatusMap in Preempt() signature - eliminate podPreemptor interface and expose its functions statelessly - move logic in scheduler.go#preempt to generic_scheduler.go#Preempt()
This commit is contained in:
@@ -18,7 +18,6 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
@@ -28,8 +27,6 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
@@ -46,6 +43,7 @@ import (
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/profile"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -55,15 +53,6 @@ const (
|
||||
pluginMetricsSamplePercent = 10
|
||||
)
|
||||
|
||||
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
|
||||
// field of the preemptor pod.
|
||||
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
|
||||
type podPreemptor interface {
|
||||
getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
|
||||
deletePod(pod *v1.Pod) error
|
||||
removeNominatedNodeName(pod *v1.Pod) error
|
||||
}
|
||||
|
||||
// Scheduler watches for new unscheduled pods. It attempts to find
|
||||
// nodes that they fit on and writes bindings back to the api server.
|
||||
type Scheduler struct {
|
||||
@@ -72,9 +61,6 @@ type Scheduler struct {
|
||||
SchedulerCache internalcache.Cache
|
||||
|
||||
Algorithm core.ScheduleAlgorithm
|
||||
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
|
||||
// the preemptor pod.
|
||||
podPreemptor podPreemptor
|
||||
|
||||
// NextPod should be a function that blocks until the next pod
|
||||
// is available. We don't use a channel for this, because scheduling
|
||||
@@ -292,7 +278,6 @@ func New(client clientset.Interface,
|
||||
sched.DisablePreemption = options.disablePreemption
|
||||
sched.StopEverything = stopEverything
|
||||
sched.client = client
|
||||
sched.podPreemptor = &podPreemptorImpl{client}
|
||||
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||
|
||||
addAllEventHandlers(sched, informerFactory, podInformer)
|
||||
@@ -382,52 +367,7 @@ func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodConditi
|
||||
if nominatedNode != "" {
|
||||
podCopy.Status.NominatedNodeName = nominatedNode
|
||||
}
|
||||
return patchPod(client, pod, podCopy)
|
||||
}
|
||||
|
||||
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
|
||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
|
||||
// It returns the node name and an error if any.
|
||||
func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
|
||||
if err != nil {
|
||||
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
|
||||
return "", err
|
||||
}
|
||||
if len(nodeName) != 0 {
|
||||
for _, victim := range victims {
|
||||
if err := sched.podPreemptor.deletePod(victim); err != nil {
|
||||
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
|
||||
return "", err
|
||||
}
|
||||
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
|
||||
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
|
||||
waitingPod.Reject("preempted")
|
||||
}
|
||||
prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
|
||||
|
||||
}
|
||||
metrics.PreemptionVictims.Observe(float64(len(victims)))
|
||||
}
|
||||
// Clearing nominated pods should happen outside of "if node != nil". Node could
|
||||
// be nil when a pod with nominated node name is eligible to preempt again,
|
||||
// but preemption logic does not find any node for it. In that case Preempt()
|
||||
// function of generic_scheduler.go returns the pod itself for removal of
|
||||
// the 'NominatedNodeName' field.
|
||||
for _, p := range nominatedPodsToClear {
|
||||
rErr := sched.podPreemptor.removeNominatedNodeName(p)
|
||||
if rErr != nil {
|
||||
klog.Errorf("Cannot remove 'NominatedNodeName' field of pod: %v", rErr)
|
||||
// We do not return as this error is not critical.
|
||||
}
|
||||
}
|
||||
return nodeName, err
|
||||
return util.PatchPod(client, pod, podCopy)
|
||||
}
|
||||
|
||||
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
|
||||
@@ -546,7 +486,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
} else {
|
||||
preemptionStartTime := time.Now()
|
||||
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
|
||||
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
|
||||
nominatedNode, _ = sched.Algorithm.Preempt(schedulingCycleCtx, prof, state, pod, fitError.FilteredNodesStatuses)
|
||||
metrics.PreemptionAttempts.Inc()
|
||||
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||
@@ -715,45 +655,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool
|
||||
return false
|
||||
}
|
||||
|
||||
type podPreemptorImpl struct {
|
||||
Client clientset.Interface
|
||||
}
|
||||
|
||||
func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
return p.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
|
||||
return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
|
||||
if len(pod.Status.NominatedNodeName) == 0 {
|
||||
return nil
|
||||
}
|
||||
podCopy := pod.DeepCopy()
|
||||
podCopy.Status.NominatedNodeName = ""
|
||||
return patchPod(p.Client, pod, podCopy)
|
||||
}
|
||||
|
||||
func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error {
|
||||
oldData, err := json.Marshal(old)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newData, err := json.Marshal(new)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
|
||||
}
|
||||
_, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
||||
return err
|
||||
}
|
||||
|
||||
func defaultAlgorithmSourceProviderName() *string {
|
||||
provider := schedulerapi.SchedulerDefaultProviderName
|
||||
return &provider
|
||||
|
Reference in New Issue
Block a user