mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
- Add Extenders() and PluginsRunner interface to PreemptHandle
- Make some private functions stateless - make addNominatedPods() not dependent on genericScheduler - make addNominatedPods() not dependent on genericScheduler - make selectVictimsOnNode() not dependent on genericScheduler - make selectNodesForPreemption() not dependent on genericScheduler
This commit is contained in:
parent
3a95b1130a
commit
59eff29d22
@ -280,7 +280,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
|
|||||||
return "", nil, nil, err
|
return "", nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
|
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return "", nil, nil, err
|
||||||
}
|
}
|
||||||
@ -442,7 +442,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
|
|||||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||||
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
||||||
fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
|
fits, status, err := podPassesFiltersOnNode(ctx, prof, g.podNominator, state, pod, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(err, cancel)
|
||||||
return
|
return
|
||||||
@ -520,12 +520,12 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
|
|||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||||
// 3) augmented nodeInfo.
|
// 3) augmented nodeInfo.
|
||||||
func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator framework.PodNominator, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
||||||
if g.podNominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
if nominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
||||||
// This may happen only in tests.
|
// This may happen only in tests.
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
nominatedPods := g.podNominator.NominatedPodsForNode(nodeInfo.Node().Name)
|
nominatedPods := nominator.NominatedPodsForNode(nodeInfo.Node().Name)
|
||||||
if len(nominatedPods) == 0 {
|
if len(nominatedPods) == 0 {
|
||||||
return false, state, nodeInfo, nil
|
return false, state, nodeInfo, nil
|
||||||
}
|
}
|
||||||
@ -535,7 +535,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
|
|||||||
for _, p := range nominatedPods {
|
for _, p := range nominatedPods {
|
||||||
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
|
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
|
||||||
nodeInfoOut.AddPod(p)
|
nodeInfoOut.AddPod(p)
|
||||||
status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
status := pr.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return false, state, nodeInfo, status.AsError()
|
return false, state, nodeInfo, status.AsError()
|
||||||
}
|
}
|
||||||
@ -555,9 +555,10 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
|
|||||||
// and add the nominated pods. Removal of the victims is done by
|
// and add the nominated pods. Removal of the victims is done by
|
||||||
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
||||||
// NodeInfo before calling this function.
|
// NodeInfo before calling this function.
|
||||||
func (g *genericScheduler) podPassesFiltersOnNode(
|
func podPassesFiltersOnNode(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prof *profile.Profile,
|
pr framework.PluginsRunner,
|
||||||
|
nominator framework.PodNominator,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
info *framework.NodeInfo,
|
info *framework.NodeInfo,
|
||||||
@ -588,7 +589,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
|
|||||||
nodeInfoToUse := info
|
nodeInfoToUse := info
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
var err error
|
var err error
|
||||||
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info)
|
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, pr, nominator, pod, state, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil, err
|
return false, nil, err
|
||||||
}
|
}
|
||||||
@ -596,7 +597,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
statusMap := pr.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
||||||
status = statusMap.Merge()
|
status = statusMap.Merge()
|
||||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||||
return false, status, status.AsError()
|
return false, status, status.AsError()
|
||||||
@ -847,9 +848,10 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
|
|||||||
|
|
||||||
// selectNodesForPreemption finds all the nodes with possible victims for
|
// selectNodesForPreemption finds all the nodes with possible victims for
|
||||||
// preemption in parallel.
|
// preemption in parallel.
|
||||||
func (g *genericScheduler) selectNodesForPreemption(
|
func selectNodesForPreemption(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prof *profile.Profile,
|
pr framework.PluginsRunner,
|
||||||
|
nominator framework.PodNominator,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
potentialNodes []*framework.NodeInfo,
|
potentialNodes []*framework.NodeInfo,
|
||||||
@ -861,7 +863,7 @@ func (g *genericScheduler) selectNodesForPreemption(
|
|||||||
checkNode := func(i int) {
|
checkNode := func(i int) {
|
||||||
nodeInfoCopy := potentialNodes[i].Clone()
|
nodeInfoCopy := potentialNodes[i].Clone()
|
||||||
stateCopy := state.Clone()
|
stateCopy := state.Clone()
|
||||||
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
|
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, pr, nominator, stateCopy, pod, nodeInfoCopy, pdbs)
|
||||||
if fits {
|
if fits {
|
||||||
resultLock.Lock()
|
resultLock.Lock()
|
||||||
victims := extenderv1.Victims{
|
victims := extenderv1.Victims{
|
||||||
@ -937,9 +939,10 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
|
|||||||
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
||||||
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
||||||
// these predicates can be satisfied by removing more pods from the node.
|
// these predicates can be satisfied by removing more pods from the node.
|
||||||
func (g *genericScheduler) selectVictimsOnNode(
|
func selectVictimsOnNode(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prof *profile.Profile,
|
pr framework.PluginsRunner,
|
||||||
|
nominator framework.PodNominator,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeInfo *framework.NodeInfo,
|
nodeInfo *framework.NodeInfo,
|
||||||
@ -951,7 +954,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
|||||||
if err := nodeInfo.RemovePod(rp); err != nil {
|
if err := nodeInfo.RemovePod(rp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
|
status := pr.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return status.AsError()
|
return status.AsError()
|
||||||
}
|
}
|
||||||
@ -959,7 +962,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
|||||||
}
|
}
|
||||||
addPod := func(ap *v1.Pod) error {
|
addPod := func(ap *v1.Pod) error {
|
||||||
nodeInfo.AddPod(ap)
|
nodeInfo.AddPod(ap)
|
||||||
status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
|
status := pr.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
return status.AsError()
|
return status.AsError()
|
||||||
}
|
}
|
||||||
@ -982,7 +985,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
|||||||
// inter-pod affinity to one or more victims, but we have decided not to
|
// inter-pod affinity to one or more victims, but we have decided not to
|
||||||
// support this case for performance reasons. Having affinity to lower
|
// support this case for performance reasons. Having affinity to lower
|
||||||
// priority pods is not a recommended configuration anyway.
|
// priority pods is not a recommended configuration anyway.
|
||||||
if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits {
|
if fits, _, err := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo); !fits {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||||
}
|
}
|
||||||
@ -1000,7 +1003,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
|||||||
if err := addPod(p); err != nil {
|
if err := addPod(p); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
|
fits, _, _ := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo)
|
||||||
if !fits {
|
if !fits {
|
||||||
if err := removePod(p); err != nil {
|
if err := removePod(p); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -1632,7 +1632,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs)
|
nodeToPods, err := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, test.pdbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -1912,7 +1912,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
|||||||
if !preFilterStatus.IsSuccess() {
|
if !preFilterStatus.IsSuccess() {
|
||||||
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
|
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
|
||||||
}
|
}
|
||||||
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil)
|
candidateNodes, _ := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, nil)
|
||||||
node := pickOneNodeForPreemption(candidateNodes)
|
node := pickOneNodeForPreemption(candidateNodes)
|
||||||
found := false
|
found := false
|
||||||
for _, nodeName := range test.expected {
|
for _, nodeName := range test.expected {
|
||||||
|
@ -121,6 +121,7 @@ type frameworkOptions struct {
|
|||||||
snapshotSharedLister SharedLister
|
snapshotSharedLister SharedLister
|
||||||
metricsRecorder *metricsRecorder
|
metricsRecorder *metricsRecorder
|
||||||
podNominator PodNominator
|
podNominator PodNominator
|
||||||
|
extenders []Extender
|
||||||
runAllFilters bool
|
runAllFilters bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,10 +171,31 @@ func WithPodNominator(nominator PodNominator) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithExtenders sets extenders for the scheduling framework.
|
||||||
|
func WithExtenders(extenders []Extender) Option {
|
||||||
|
return func(o *frameworkOptions) {
|
||||||
|
o.extenders = extenders
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var defaultFrameworkOptions = frameworkOptions{
|
var defaultFrameworkOptions = frameworkOptions{
|
||||||
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(#91029): move this to framework runtime package.
|
||||||
|
var _ PreemptHandle = &preemptHandle{}
|
||||||
|
|
||||||
|
type preemptHandle struct {
|
||||||
|
extenders []Extender
|
||||||
|
PodNominator
|
||||||
|
PluginsRunner
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extenders returns the registered extenders.
|
||||||
|
func (ph *preemptHandle) Extenders() []Extender {
|
||||||
|
return ph.extenders
|
||||||
|
}
|
||||||
|
|
||||||
var _ Framework = &framework{}
|
var _ Framework = &framework{}
|
||||||
|
|
||||||
// NewFramework initializes plugins given the configuration and the registry.
|
// NewFramework initializes plugins given the configuration and the registry.
|
||||||
@ -191,9 +213,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
|||||||
clientSet: options.clientSet,
|
clientSet: options.clientSet,
|
||||||
informerFactory: options.informerFactory,
|
informerFactory: options.informerFactory,
|
||||||
metricsRecorder: options.metricsRecorder,
|
metricsRecorder: options.metricsRecorder,
|
||||||
preemptHandle: options.podNominator,
|
|
||||||
runAllFilters: options.runAllFilters,
|
runAllFilters: options.runAllFilters,
|
||||||
}
|
}
|
||||||
|
f.preemptHandle = &preemptHandle{
|
||||||
|
extenders: options.extenders,
|
||||||
|
PodNominator: options.podNominator,
|
||||||
|
PluginsRunner: f,
|
||||||
|
}
|
||||||
if plugins == nil {
|
if plugins == nil {
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
@ -494,7 +494,12 @@ type FrameworkHandle interface {
|
|||||||
|
|
||||||
// PreemptHandle incorporates all needed logic to run preemption logic.
|
// PreemptHandle incorporates all needed logic to run preemption logic.
|
||||||
type PreemptHandle interface {
|
type PreemptHandle interface {
|
||||||
|
// PodNominator abstracts operations to maintain nominated Pods.
|
||||||
PodNominator
|
PodNominator
|
||||||
|
// PluginsRunner abstracts operations to run some plugins.
|
||||||
|
PluginsRunner
|
||||||
|
// Extenders returns registered scheduler extenders.
|
||||||
|
Extenders() []Extender
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodNominator abstracts operations to maintain nominated Pods.
|
// PodNominator abstracts operations to maintain nominated Pods.
|
||||||
@ -509,3 +514,15 @@ type PodNominator interface {
|
|||||||
// NominatedPodsForNode returns nominatedPods on the given node.
|
// NominatedPodsForNode returns nominatedPods on the given node.
|
||||||
NominatedPodsForNode(nodeName string) []*v1.Pod
|
NominatedPodsForNode(nodeName string) []*v1.Pod
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PluginsRunner abstracts operations to run some plugins.
|
||||||
|
// This is used by preemption PostFilter plugins when evaluating the feasibility of
|
||||||
|
// scheduling the pod on nodes when certain running pods get evicted.
|
||||||
|
type PluginsRunner interface {
|
||||||
|
// RunFilterPlugins runs the set of configured filter plugins for pod on the given node.
|
||||||
|
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
|
||||||
|
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured PreFilter plugins.
|
||||||
|
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
|
||||||
|
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured PreFilter plugins.
|
||||||
|
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status
|
||||||
|
}
|
||||||
|
@ -145,7 +145,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile,
|
|||||||
func (es mockScheduler) Extenders() []framework.Extender {
|
func (es mockScheduler) Extenders() []framework.Extender {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
|
|
||||||
|
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
|
||||||
return "", nil, nil, nil
|
return "", nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user