mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Remove scheduler plugins package's dependency from core package
This commit is contained in:
parent
5ed4b76a03
commit
070773c399
@ -20,7 +20,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
"//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||||
"//vendor/k8s.io/utils/trace:go_default_library",
|
"//vendor/k8s.io/utils/trace:go_default_library",
|
||||||
|
@ -29,7 +29,6 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||||
@ -297,12 +296,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra
|
|||||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||||
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
|
||||||
fits, status, err := PodPassesFiltersOnNode(ctx, fwk.PreemptHandle(), state, pod, nodeInfo)
|
status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
||||||
if err != nil {
|
if status.Code() == framework.Error {
|
||||||
errCh.SendErrorWithCancel(err, cancel)
|
errCh.SendErrorWithCancel(status.AsError(), cancel)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if fits {
|
if status.IsSuccess() {
|
||||||
length := atomic.AddInt32(&feasibleNodesLen, 1)
|
length := atomic.AddInt32(&feasibleNodesLen, 1)
|
||||||
if length > numNodesToFind {
|
if length > numNodesToFind {
|
||||||
cancel()
|
cancel()
|
||||||
@ -311,11 +310,9 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra
|
|||||||
feasibleNodes[length-1] = nodeInfo.Node()
|
feasibleNodes[length-1] = nodeInfo.Node()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if !status.IsSuccess() {
|
statusesLock.Lock()
|
||||||
statusesLock.Lock()
|
statuses[nodeInfo.Node().Name] = status
|
||||||
statuses[nodeInfo.Node().Name] = status
|
statusesLock.Unlock()
|
||||||
statusesLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -372,96 +369,6 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
|
|||||||
return feasibleNodes, nil
|
return feasibleNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
|
||||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
|
||||||
// 3) augmented nodeInfo.
|
|
||||||
func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
|
||||||
if ph == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
|
||||||
// This may happen only in tests.
|
|
||||||
return false, state, nodeInfo, nil
|
|
||||||
}
|
|
||||||
nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name)
|
|
||||||
if len(nominatedPods) == 0 {
|
|
||||||
return false, state, nodeInfo, nil
|
|
||||||
}
|
|
||||||
nodeInfoOut := nodeInfo.Clone()
|
|
||||||
stateOut := state.Clone()
|
|
||||||
podsAdded := false
|
|
||||||
for _, p := range nominatedPods {
|
|
||||||
if corev1helpers.PodPriority(p) >= corev1helpers.PodPriority(pod) && p.UID != pod.UID {
|
|
||||||
nodeInfoOut.AddPod(p)
|
|
||||||
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
|
||||||
if !status.IsSuccess() {
|
|
||||||
return false, state, nodeInfo, status.AsError()
|
|
||||||
}
|
|
||||||
podsAdded = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return podsAdded, stateOut, nodeInfoOut, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PodPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the
|
|
||||||
// filter plugins.
|
|
||||||
// This function is called from two different places: Schedule and Preempt.
|
|
||||||
// When it is called from Schedule, we want to test whether the pod is
|
|
||||||
// schedulable on the node with all the existing pods on the node plus higher
|
|
||||||
// and equal priority pods nominated to run on the node.
|
|
||||||
// When it is called from Preempt, we should remove the victims of preemption
|
|
||||||
// and add the nominated pods. Removal of the victims is done by
|
|
||||||
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
|
||||||
// NodeInfo before calling this function.
|
|
||||||
// TODO: move this out so that plugins don't need to depend on <core> pkg.
|
|
||||||
func PodPassesFiltersOnNode(
|
|
||||||
ctx context.Context,
|
|
||||||
ph framework.PreemptHandle,
|
|
||||||
state *framework.CycleState,
|
|
||||||
pod *v1.Pod,
|
|
||||||
info *framework.NodeInfo,
|
|
||||||
) (bool, *framework.Status, error) {
|
|
||||||
var status *framework.Status
|
|
||||||
|
|
||||||
podsAdded := false
|
|
||||||
// We run filters twice in some cases. If the node has greater or equal priority
|
|
||||||
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
|
|
||||||
// If all filters succeed in this pass, we run them again when these
|
|
||||||
// nominated pods are not added. This second pass is necessary because some
|
|
||||||
// filters such as inter-pod affinity may not pass without the nominated pods.
|
|
||||||
// If there are no nominated pods for the node or if the first run of the
|
|
||||||
// filters fail, we don't run the second pass.
|
|
||||||
// We consider only equal or higher priority pods in the first pass, because
|
|
||||||
// those are the current "pod" must yield to them and not take a space opened
|
|
||||||
// for running them. It is ok if the current "pod" take resources freed for
|
|
||||||
// lower priority pods.
|
|
||||||
// Requiring that the new pod is schedulable in both circumstances ensures that
|
|
||||||
// we are making a conservative decision: filters like resources and inter-pod
|
|
||||||
// anti-affinity are more likely to fail when the nominated pods are treated
|
|
||||||
// as running, while filters like pod affinity are more likely to fail when
|
|
||||||
// the nominated pods are treated as not running. We can't just assume the
|
|
||||||
// nominated pods are running because they are not running right now and in fact,
|
|
||||||
// they may end up getting scheduled to a different node.
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
stateToUse := state
|
|
||||||
nodeInfoToUse := info
|
|
||||||
if i == 0 {
|
|
||||||
var err error
|
|
||||||
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
|
|
||||||
if err != nil {
|
|
||||||
return false, nil, err
|
|
||||||
}
|
|
||||||
} else if !podsAdded || !status.IsSuccess() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
|
||||||
status = statusMap.Merge()
|
|
||||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
|
||||||
return false, status, status.AsError()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return status.IsSuccess(), status, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// prioritizeNodes prioritizes the nodes by running the score plugins,
|
// prioritizeNodes prioritizes the nodes by running the score plugins,
|
||||||
// which return a score for each node from the call to RunScorePlugins().
|
// which return a score for each node from the call to RunScorePlugins().
|
||||||
// The scores from each plugin are added together to make the score for that node, then
|
// The scores from each plugin are added together to make the score for that node, then
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
@ -539,6 +539,9 @@ type Handle interface {
|
|||||||
|
|
||||||
SharedInformerFactory() informers.SharedInformerFactory
|
SharedInformerFactory() informers.SharedInformerFactory
|
||||||
|
|
||||||
|
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
|
||||||
|
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
|
||||||
|
|
||||||
// TODO: unroll the wrapped interfaces to Handle.
|
// TODO: unroll the wrapped interfaces to Handle.
|
||||||
PreemptHandle() PreemptHandle
|
PreemptHandle() PreemptHandle
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,6 @@ go_library(
|
|||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/scheduler/apis/config:go_default_library",
|
"//pkg/scheduler/apis/config:go_default_library",
|
||||||
"//pkg/scheduler/apis/config/validation:go_default_library",
|
"//pkg/scheduler/apis/config/validation:go_default_library",
|
||||||
"//pkg/scheduler/core:go_default_library",
|
|
||||||
"//pkg/scheduler/framework:go_default_library",
|
"//pkg/scheduler/framework:go_default_library",
|
||||||
"//pkg/scheduler/internal/parallelize:go_default_library",
|
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
|
@ -42,7 +42,6 @@ import (
|
|||||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
@ -191,7 +190,7 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(allNodes) == 0 {
|
if len(allNodes) == 0 {
|
||||||
return nil, core.ErrNoNodesAvailable
|
return nil, fmt.Errorf("no nodes available")
|
||||||
}
|
}
|
||||||
|
|
||||||
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
|
||||||
@ -219,7 +218,7 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor
|
|||||||
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
|
klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates)
|
||||||
}
|
}
|
||||||
|
|
||||||
return dryRunPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs, offset, numCandidates), nil
|
return dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
// PodEligibleToPreemptOthers determines whether this pod should be considered
|
||||||
@ -308,7 +307,7 @@ func (cl *candidateList) get() []Candidate {
|
|||||||
// and returns preemption candidates. The number of candidates depends on the
|
// and returns preemption candidates. The number of candidates depends on the
|
||||||
// constraints defined in the plugin's args. In the returned list of
|
// constraints defined in the plugin's args. In the returned list of
|
||||||
// candidates, ones that do not violate PDB are preferred over ones that do.
|
// candidates, ones that do not violate PDB are preferred over ones that do.
|
||||||
func dryRunPreemption(ctx context.Context, fh framework.PreemptHandle,
|
func dryRunPreemption(ctx context.Context, fh framework.Handle,
|
||||||
state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
|
state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo,
|
||||||
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate {
|
pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate {
|
||||||
nonViolatingCandidates := newCandidateList(numCandidates)
|
nonViolatingCandidates := newCandidateList(numCandidates)
|
||||||
@ -577,7 +576,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
|
|||||||
// these predicates can be satisfied by removing more pods from the node.
|
// these predicates can be satisfied by removing more pods from the node.
|
||||||
func selectVictimsOnNode(
|
func selectVictimsOnNode(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
ph framework.PreemptHandle,
|
fh framework.Handle,
|
||||||
state *framework.CycleState,
|
state *framework.CycleState,
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeInfo *framework.NodeInfo,
|
nodeInfo *framework.NodeInfo,
|
||||||
@ -585,6 +584,7 @@ func selectVictimsOnNode(
|
|||||||
) ([]*v1.Pod, int, bool) {
|
) ([]*v1.Pod, int, bool) {
|
||||||
var potentialVictims []*v1.Pod
|
var potentialVictims []*v1.Pod
|
||||||
|
|
||||||
|
ph := fh.PreemptHandle()
|
||||||
removePod := func(rp *v1.Pod) error {
|
removePod := func(rp *v1.Pod) error {
|
||||||
if err := nodeInfo.RemovePod(rp); err != nil {
|
if err := nodeInfo.RemovePod(rp); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -626,9 +626,9 @@ func selectVictimsOnNode(
|
|||||||
// inter-pod affinity to one or more victims, but we have decided not to
|
// inter-pod affinity to one or more victims, but we have decided not to
|
||||||
// support this case for performance reasons. Having affinity to lower
|
// support this case for performance reasons. Having affinity to lower
|
||||||
// priority pods is not a recommended configuration anyway.
|
// priority pods is not a recommended configuration anyway.
|
||||||
if fits, _, err := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo); !fits {
|
if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
|
||||||
if err != nil {
|
if status.Code() == framework.Error {
|
||||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, status.AsError())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, 0, false
|
return nil, 0, false
|
||||||
@ -644,7 +644,8 @@ func selectVictimsOnNode(
|
|||||||
if err := addPod(p); err != nil {
|
if err := addPod(p); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
fits, _, _ := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo)
|
status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
|
||||||
|
fits := status.IsSuccess()
|
||||||
if !fits {
|
if !fits {
|
||||||
if err := removePod(p); err != nil {
|
if err := removePod(p); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -978,7 +978,7 @@ func TestDryRunPreemption(t *testing.T) {
|
|||||||
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
|
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
|
||||||
}
|
}
|
||||||
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||||
got := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
|
got := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1201,7 +1201,7 @@ func TestSelectBestCandidate(t *testing.T) {
|
|||||||
|
|
||||||
pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()}
|
pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()}
|
||||||
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos)))
|
||||||
candidates := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, nil, offset, numCandidates)
|
candidates := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates)
|
||||||
s := SelectCandidate(candidates)
|
s := SelectCandidate(candidates)
|
||||||
found := false
|
found := false
|
||||||
for _, nodeName := range tt.expected {
|
for _, nodeName := range tt.expected {
|
||||||
|
@ -25,6 +25,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||||
|
"//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library",
|
||||||
"//staging/src/k8s.io/kube-scheduler/config/v1beta1:go_default_library",
|
"//staging/src/k8s.io/kube-scheduler/config/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||||
"//vendor/sigs.k8s.io/yaml:go_default_library",
|
"//vendor/sigs.k8s.io/yaml:go_default_library",
|
||||||
@ -41,6 +42,7 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/scheduler/apis/config:go_default_library",
|
"//pkg/scheduler/apis/config:go_default_library",
|
||||||
"//pkg/scheduler/framework:go_default_library",
|
"//pkg/scheduler/framework:go_default_library",
|
||||||
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/events"
|
"k8s.io/client-go/tools/events"
|
||||||
|
"k8s.io/component-helpers/scheduling/corev1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kube-scheduler/config/v1beta1"
|
"k8s.io/kube-scheduler/config/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
@ -594,6 +595,90 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po
|
|||||||
return r, s
|
return r, s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
|
||||||
|
// for nominated pod on the given node.
|
||||||
|
// This function is called from two different places: Schedule and Preempt.
|
||||||
|
// When it is called from Schedule, we want to test whether the pod is
|
||||||
|
// schedulable on the node with all the existing pods on the node plus higher
|
||||||
|
// and equal priority pods nominated to run on the node.
|
||||||
|
// When it is called from Preempt, we should remove the victims of preemption
|
||||||
|
// and add the nominated pods. Removal of the victims is done by
|
||||||
|
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
|
||||||
|
// NodeInfo before calling this function.
|
||||||
|
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
|
||||||
|
var status *framework.Status
|
||||||
|
|
||||||
|
ph := f.PreemptHandle()
|
||||||
|
podsAdded := false
|
||||||
|
// We run filters twice in some cases. If the node has greater or equal priority
|
||||||
|
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
|
||||||
|
// If all filters succeed in this pass, we run them again when these
|
||||||
|
// nominated pods are not added. This second pass is necessary because some
|
||||||
|
// filters such as inter-pod affinity may not pass without the nominated pods.
|
||||||
|
// If there are no nominated pods for the node or if the first run of the
|
||||||
|
// filters fail, we don't run the second pass.
|
||||||
|
// We consider only equal or higher priority pods in the first pass, because
|
||||||
|
// those are the current "pod" must yield to them and not take a space opened
|
||||||
|
// for running them. It is ok if the current "pod" take resources freed for
|
||||||
|
// lower priority pods.
|
||||||
|
// Requiring that the new pod is schedulable in both circumstances ensures that
|
||||||
|
// we are making a conservative decision: filters like resources and inter-pod
|
||||||
|
// anti-affinity are more likely to fail when the nominated pods are treated
|
||||||
|
// as running, while filters like pod affinity are more likely to fail when
|
||||||
|
// the nominated pods are treated as not running. We can't just assume the
|
||||||
|
// nominated pods are running because they are not running right now and in fact,
|
||||||
|
// they may end up getting scheduled to a different node.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
stateToUse := state
|
||||||
|
nodeInfoToUse := info
|
||||||
|
if i == 0 {
|
||||||
|
var err error
|
||||||
|
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info)
|
||||||
|
if err != nil {
|
||||||
|
return framework.NewStatus(framework.Error, err.Error())
|
||||||
|
}
|
||||||
|
} else if !podsAdded || !status.IsSuccess() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
|
||||||
|
status = statusMap.Merge()
|
||||||
|
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
|
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||||
|
// 3) augmented nodeInfo.
|
||||||
|
func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
|
||||||
|
if ph == nil || nodeInfo.Node() == nil {
|
||||||
|
// This may happen only in tests.
|
||||||
|
return false, state, nodeInfo, nil
|
||||||
|
}
|
||||||
|
nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name)
|
||||||
|
if len(nominatedPods) == 0 {
|
||||||
|
return false, state, nodeInfo, nil
|
||||||
|
}
|
||||||
|
nodeInfoOut := nodeInfo.Clone()
|
||||||
|
stateOut := state.Clone()
|
||||||
|
podsAdded := false
|
||||||
|
for _, p := range nominatedPods {
|
||||||
|
if corev1.PodPriority(p) >= corev1.PodPriority(pod) && p.UID != pod.UID {
|
||||||
|
nodeInfoOut.AddPod(p)
|
||||||
|
status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
return false, state, nodeInfo, status.AsError()
|
||||||
|
}
|
||||||
|
podsAdded = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return podsAdded, stateOut, nodeInfoOut, nil
|
||||||
|
}
|
||||||
|
|
||||||
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
|
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
|
||||||
// of these plugins returns any status other than "Success", the given pod is rejected.
|
// of these plugins returns any status other than "Success", the given pod is rejected.
|
||||||
func (f *frameworkImpl) RunPreScorePlugins(
|
func (f *frameworkImpl) RunPreScorePlugins(
|
||||||
|
@ -28,12 +28,14 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||||
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,6 +53,7 @@ const (
|
|||||||
bindPlugin = "bind-plugin"
|
bindPlugin = "bind-plugin"
|
||||||
|
|
||||||
testProfileName = "test-profile"
|
testProfileName = "test-profile"
|
||||||
|
nodeName = "testNode"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
|
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
|
||||||
@ -341,6 +344,20 @@ var state = &framework.CycleState{}
|
|||||||
|
|
||||||
// Pod is only used for logging errors.
|
// Pod is only used for logging errors.
|
||||||
var pod = &v1.Pod{}
|
var pod = &v1.Pod{}
|
||||||
|
var node = &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: nodeName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var lowPriority, highPriority = int32(0), int32(1000)
|
||||||
|
var lowPriorityPod = &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "low"},
|
||||||
|
Spec: v1.PodSpec{Priority: &lowPriority},
|
||||||
|
}
|
||||||
|
var highPriorityPod = &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{UID: "high"},
|
||||||
|
Spec: v1.PodSpec{Priority: &highPriority},
|
||||||
|
}
|
||||||
var nodes = []*v1.Node{
|
var nodes = []*v1.Node{
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
|
||||||
@ -1144,6 +1161,154 @@ func TestPostFilterPlugins(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFilterPluginsWithNominatedPods(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
preFilterPlugin *TestPlugin
|
||||||
|
filterPlugin *TestPlugin
|
||||||
|
pod *v1.Pod
|
||||||
|
nominatedPod *v1.Pod
|
||||||
|
node *v1.Node
|
||||||
|
nodeInfo *framework.NodeInfo
|
||||||
|
wantStatus *framework.Status
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "node has no nominated pod",
|
||||||
|
preFilterPlugin: nil,
|
||||||
|
filterPlugin: nil,
|
||||||
|
pod: lowPriorityPod,
|
||||||
|
nominatedPod: nil,
|
||||||
|
node: node,
|
||||||
|
nodeInfo: framework.NewNodeInfo(pod),
|
||||||
|
wantStatus: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node has a high-priority nominated pod and all filters succeed",
|
||||||
|
preFilterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin1",
|
||||||
|
inj: injectedResult{
|
||||||
|
PreFilterAddPodStatus: int(framework.Success),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
filterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin2",
|
||||||
|
inj: injectedResult{
|
||||||
|
FilterStatus: int(framework.Success),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pod: lowPriorityPod,
|
||||||
|
nominatedPod: highPriorityPod,
|
||||||
|
node: node,
|
||||||
|
nodeInfo: framework.NewNodeInfo(pod),
|
||||||
|
wantStatus: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node has a high-priority nominated pod and pre filters fail",
|
||||||
|
preFilterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin1",
|
||||||
|
inj: injectedResult{
|
||||||
|
PreFilterAddPodStatus: int(framework.Error),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
filterPlugin: nil,
|
||||||
|
pod: lowPriorityPod,
|
||||||
|
nominatedPod: highPriorityPod,
|
||||||
|
node: node,
|
||||||
|
nodeInfo: framework.NewNodeInfo(pod),
|
||||||
|
wantStatus: framework.NewStatus(framework.Error, `running AddPod on PreFilter plugin "TestPlugin1": injected status`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node has a high-priority nominated pod and filters fail",
|
||||||
|
preFilterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin1",
|
||||||
|
inj: injectedResult{
|
||||||
|
PreFilterAddPodStatus: int(framework.Success),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
filterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin2",
|
||||||
|
inj: injectedResult{
|
||||||
|
FilterStatus: int(framework.Error),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pod: lowPriorityPod,
|
||||||
|
nominatedPod: highPriorityPod,
|
||||||
|
node: node,
|
||||||
|
nodeInfo: framework.NewNodeInfo(pod),
|
||||||
|
wantStatus: framework.NewStatus(framework.Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node has a low-priority nominated pod and pre filters return unschedulable",
|
||||||
|
preFilterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin1",
|
||||||
|
inj: injectedResult{
|
||||||
|
PreFilterAddPodStatus: int(framework.Unschedulable),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
filterPlugin: &TestPlugin{
|
||||||
|
name: "TestPlugin2",
|
||||||
|
inj: injectedResult{
|
||||||
|
FilterStatus: int(framework.Success),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pod: highPriorityPod,
|
||||||
|
nominatedPod: lowPriorityPod,
|
||||||
|
node: node,
|
||||||
|
nodeInfo: framework.NewNodeInfo(pod),
|
||||||
|
wantStatus: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
registry := Registry{}
|
||||||
|
cfgPls := &config.Plugins{
|
||||||
|
PreFilter: &config.PluginSet{},
|
||||||
|
Filter: &config.PluginSet{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if tt.preFilterPlugin != nil {
|
||||||
|
if err := registry.Register(tt.preFilterPlugin.name,
|
||||||
|
func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
||||||
|
return tt.preFilterPlugin, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("fail to register preFilter plugin (%s)", tt.preFilterPlugin.name)
|
||||||
|
}
|
||||||
|
cfgPls.PreFilter.Enabled = append(
|
||||||
|
cfgPls.PreFilter.Enabled,
|
||||||
|
config.Plugin{Name: tt.preFilterPlugin.name},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if tt.filterPlugin != nil {
|
||||||
|
if err := registry.Register(tt.filterPlugin.name,
|
||||||
|
func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
||||||
|
return tt.filterPlugin, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatalf("fail to register filter plugin (%s)", tt.filterPlugin.name)
|
||||||
|
}
|
||||||
|
cfgPls.Filter.Enabled = append(
|
||||||
|
cfgPls.Filter.Enabled,
|
||||||
|
config.Plugin{Name: tt.filterPlugin.name},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
podNominator := internalqueue.NewPodNominator()
|
||||||
|
if tt.nominatedPod != nil {
|
||||||
|
podNominator.AddNominatedPod(tt.nominatedPod, nodeName)
|
||||||
|
}
|
||||||
|
f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs, WithPodNominator(podNominator))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("fail to create framework: %s", err)
|
||||||
|
}
|
||||||
|
tt.nodeInfo.SetNode(tt.node)
|
||||||
|
gotStatus := f.RunFilterPluginsWithNominatedPods(context.TODO(), nil, tt.pod, tt.nodeInfo)
|
||||||
|
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
|
||||||
|
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPreBindPlugins(t *testing.T) {
|
func TestPreBindPlugins(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -189,7 +189,9 @@ rules:
|
|||||||
- k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports
|
- k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports
|
||||||
- k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources
|
- k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources
|
||||||
- k8s.io/kubernetes/pkg/scheduler/framework/runtime
|
- k8s.io/kubernetes/pkg/scheduler/framework/runtime
|
||||||
|
- k8s.io/kubernetes/pkg/scheduler/internal/heap
|
||||||
- k8s.io/kubernetes/pkg/scheduler/internal/parallelize
|
- k8s.io/kubernetes/pkg/scheduler/internal/parallelize
|
||||||
|
- k8s.io/kubernetes/pkg/scheduler/internal/queue
|
||||||
- k8s.io/kubernetes/pkg/scheduler/listers
|
- k8s.io/kubernetes/pkg/scheduler/listers
|
||||||
- k8s.io/kubernetes/pkg/scheduler/metrics
|
- k8s.io/kubernetes/pkg/scheduler/metrics
|
||||||
- k8s.io/kubernetes/pkg/scheduler/nodeinfo
|
- k8s.io/kubernetes/pkg/scheduler/nodeinfo
|
||||||
|
Loading…
Reference in New Issue
Block a user