mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Remove uses of NodeInfoMap outside of snapshot and cache
Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
parent
5d916e20dd
commit
bcab9b4c9e
@ -82,6 +82,7 @@ go_test(
|
|||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/cache/fake:go_default_library",
|
"//pkg/scheduler/internal/cache/fake:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
|
"//pkg/scheduler/listers:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||||
"//pkg/scheduler/testing:go_default_library",
|
"//pkg/scheduler/testing:go_default_library",
|
||||||
|
@ -10,7 +10,7 @@ go_library(
|
|||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/scheduler/apis/extender/v1:go_default_library",
|
"//pkg/scheduler/apis/extender/v1:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/listers:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -19,7 +19,7 @@ package algorithm
|
|||||||
import (
|
import (
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
"k8s.io/kubernetes/pkg/scheduler/listers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SchedulerExtender is an interface for external processes to influence scheduling
|
// SchedulerExtender is an interface for external processes to influence scheduling
|
||||||
@ -32,9 +32,7 @@ type SchedulerExtender interface {
|
|||||||
// Filter based on extender-implemented predicate functions. The filtered list is
|
// Filter based on extender-implemented predicate functions. The filtered list is
|
||||||
// expected to be a subset of the supplied list. failedNodesMap optionally contains
|
// expected to be a subset of the supplied list. failedNodesMap optionally contains
|
||||||
// the list of failed nodes and failure reasons.
|
// the list of failed nodes and failure reasons.
|
||||||
Filter(pod *v1.Pod,
|
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
|
||||||
nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
|
||||||
) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
|
|
||||||
|
|
||||||
// Prioritize based on extender-implemented priority functions. The returned scores & weight
|
// Prioritize based on extender-implemented priority functions. The returned scores & weight
|
||||||
// are used to compute the weighted score for an extender. The weighted scores are added to
|
// are used to compute the weighted score for an extender. The weighted scores are added to
|
||||||
@ -62,8 +60,7 @@ type SchedulerExtender interface {
|
|||||||
ProcessPreemption(
|
ProcessPreemption(
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error)
|
||||||
) (map[*v1.Node]*extenderv1.Victims, error)
|
|
||||||
|
|
||||||
// SupportsPreemption returns if the scheduler extender support preemption or not.
|
// SupportsPreemption returns if the scheduler extender support preemption or not.
|
||||||
SupportsPreemption() bool
|
SupportsPreemption() bool
|
||||||
|
@ -18,6 +18,7 @@ go_library(
|
|||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
|
"//pkg/scheduler/listers:go_default_library",
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||||
@ -64,6 +65,7 @@ go_test(
|
|||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
|
"//pkg/scheduler/listers:go_default_library",
|
||||||
"//pkg/scheduler/listers/fake:go_default_library",
|
"//pkg/scheduler/listers/fake:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||||
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/listers"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -166,7 +167,7 @@ func (h *HTTPExtender) SupportsPreemption() bool {
|
|||||||
func (h *HTTPExtender) ProcessPreemption(
|
func (h *HTTPExtender) ProcessPreemption(
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodeInfos listers.NodeInfoLister,
|
||||||
) (map[*v1.Node]*extenderv1.Victims, error) {
|
) (map[*v1.Node]*extenderv1.Victims, error) {
|
||||||
var (
|
var (
|
||||||
result extenderv1.ExtenderPreemptionResult
|
result extenderv1.ExtenderPreemptionResult
|
||||||
@ -198,7 +199,7 @@ func (h *HTTPExtender) ProcessPreemption(
|
|||||||
|
|
||||||
// Extender will always return NodeNameToMetaVictims.
|
// Extender will always return NodeNameToMetaVictims.
|
||||||
// So let's convert it to NodeToVictims by using NodeNameToInfo.
|
// So let's convert it to NodeToVictims by using NodeNameToInfo.
|
||||||
newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo)
|
newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeInfos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -210,46 +211,43 @@ func (h *HTTPExtender) ProcessPreemption(
|
|||||||
// such as UIDs and names, to object pointers.
|
// such as UIDs and names, to object pointers.
|
||||||
func (h *HTTPExtender) convertToNodeToVictims(
|
func (h *HTTPExtender) convertToNodeToVictims(
|
||||||
nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
|
nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodeInfos listers.NodeInfoLister,
|
||||||
) (map[*v1.Node]*extenderv1.Victims, error) {
|
) (map[*v1.Node]*extenderv1.Victims, error) {
|
||||||
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
|
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
|
||||||
for nodeName, metaVictims := range nodeNameToMetaVictims {
|
for nodeName, metaVictims := range nodeNameToMetaVictims {
|
||||||
|
nodeInfo, err := nodeInfos.Get(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
victims := &extenderv1.Victims{
|
victims := &extenderv1.Victims{
|
||||||
Pods: []*v1.Pod{},
|
Pods: []*v1.Pod{},
|
||||||
}
|
}
|
||||||
for _, metaPod := range metaVictims.Pods {
|
for _, metaPod := range metaVictims.Pods {
|
||||||
pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo)
|
pod, err := h.convertPodUIDToPod(metaPod, nodeInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
victims.Pods = append(victims.Pods, pod)
|
victims.Pods = append(victims.Pods, pod)
|
||||||
}
|
}
|
||||||
nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims
|
nodeToVictims[nodeInfo.Node()] = victims
|
||||||
}
|
}
|
||||||
return nodeToVictims, nil
|
return nodeToVictims, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node name.
|
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node info.
|
||||||
// The v1.Pod object is restored by nodeInfo.Pods().
|
// The v1.Pod object is restored by nodeInfo.Pods().
|
||||||
// It should return error if there's cache inconsistency between default scheduler and extender
|
// It returns an error if there's cache inconsistency between default scheduler
|
||||||
// so that this pod or node is missing from nodeNameToInfo.
|
// and extender, i.e. when the pod is not found in nodeInfo.Pods.
|
||||||
func (h *HTTPExtender) convertPodUIDToPod(
|
func (h *HTTPExtender) convertPodUIDToPod(
|
||||||
metaPod *extenderv1.MetaPod,
|
metaPod *extenderv1.MetaPod,
|
||||||
nodeName string,
|
nodeInfo *schedulernodeinfo.NodeInfo) (*v1.Pod, error) {
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) (*v1.Pod, error) {
|
for _, pod := range nodeInfo.Pods() {
|
||||||
var nodeInfo *schedulernodeinfo.NodeInfo
|
if string(pod.UID) == metaPod.UID {
|
||||||
if nodeInfo, ok := nodeNameToInfo[nodeName]; ok {
|
return pod, nil
|
||||||
for _, pod := range nodeInfo.Pods() {
|
|
||||||
if string(pod.UID) == metaPod.UID {
|
|
||||||
return pod, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
|
|
||||||
h.extenderURL, metaPod, nodeInfo.Node().Name)
|
|
||||||
}
|
}
|
||||||
|
return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
|
||||||
return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map",
|
h.extenderURL, metaPod, nodeInfo.Node().Name)
|
||||||
h.extenderURL, nodeInfo.Node().Name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertToNodeNameToMetaVictims converts from struct type to meta types.
|
// convertToNodeNameToMetaVictims converts from struct type to meta types.
|
||||||
@ -288,7 +286,7 @@ func convertToNodeNameToVictims(
|
|||||||
// failedNodesMap optionally contains the list of failed nodes and failure reasons.
|
// failedNodesMap optionally contains the list of failed nodes and failure reasons.
|
||||||
func (h *HTTPExtender) Filter(
|
func (h *HTTPExtender) Filter(
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodes []*v1.Node,
|
||||||
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
|
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
|
||||||
var (
|
var (
|
||||||
result extenderv1.ExtenderFilterResult
|
result extenderv1.ExtenderFilterResult
|
||||||
@ -297,6 +295,10 @@ func (h *HTTPExtender) Filter(
|
|||||||
nodeResult []*v1.Node
|
nodeResult []*v1.Node
|
||||||
args *extenderv1.ExtenderArgs
|
args *extenderv1.ExtenderArgs
|
||||||
)
|
)
|
||||||
|
fromNodeName := make(map[string]*v1.Node)
|
||||||
|
for _, n := range nodes {
|
||||||
|
fromNodeName[n.Name] = n
|
||||||
|
}
|
||||||
|
|
||||||
if h.filterVerb == "" {
|
if h.filterVerb == "" {
|
||||||
return nodes, extenderv1.FailedNodesMap{}, nil
|
return nodes, extenderv1.FailedNodesMap{}, nil
|
||||||
@ -331,11 +333,11 @@ func (h *HTTPExtender) Filter(
|
|||||||
if h.nodeCacheCapable && result.NodeNames != nil {
|
if h.nodeCacheCapable && result.NodeNames != nil {
|
||||||
nodeResult = make([]*v1.Node, len(*result.NodeNames))
|
nodeResult = make([]*v1.Node, len(*result.NodeNames))
|
||||||
for i, nodeName := range *result.NodeNames {
|
for i, nodeName := range *result.NodeNames {
|
||||||
if node, ok := nodeNameToInfo[nodeName]; ok {
|
if n, ok := fromNodeName[nodeName]; ok {
|
||||||
nodeResult[i] = node.Node()
|
nodeResult[i] = n
|
||||||
} else {
|
} else {
|
||||||
return nil, nil, fmt.Errorf(
|
return nil, nil, fmt.Errorf(
|
||||||
"extender %q claims a filtered node %q which is not found in nodeNameToInfo map",
|
"extender %q claims a filtered node %q which is not found in the input node list",
|
||||||
h.extenderURL, nodeName)
|
h.extenderURL, nodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/listers"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||||
@ -160,7 +161,7 @@ func (f *FakeExtender) SupportsPreemption() bool {
|
|||||||
func (f *FakeExtender) ProcessPreemption(
|
func (f *FakeExtender) ProcessPreemption(
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodeInfos listers.NodeInfoLister,
|
||||||
) (map[*v1.Node]*extenderv1.Victims, error) {
|
) (map[*v1.Node]*extenderv1.Victims, error) {
|
||||||
nodeToVictimsCopy := map[*v1.Node]*extenderv1.Victims{}
|
nodeToVictimsCopy := map[*v1.Node]*extenderv1.Victims{}
|
||||||
// We don't want to change the original nodeToVictims
|
// We don't want to change the original nodeToVictims
|
||||||
@ -174,7 +175,7 @@ func (f *FakeExtender) ProcessPreemption(
|
|||||||
|
|
||||||
for node, victims := range nodeToVictimsCopy {
|
for node, victims := range nodeToVictimsCopy {
|
||||||
// Try to do preemption on extender side.
|
// Try to do preemption on extender side.
|
||||||
extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo)
|
extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -196,11 +197,7 @@ func (f *FakeExtender) ProcessPreemption(
|
|||||||
// 1. More victim pods (if any) amended by preemption phase of extender.
|
// 1. More victim pods (if any) amended by preemption phase of extender.
|
||||||
// 2. Number of violating victim (used to calculate PDB).
|
// 2. Number of violating victim (used to calculate PDB).
|
||||||
// 3. Fits or not after preemption phase on extender's side.
|
// 3. Fits or not after preemption phase on extender's side.
|
||||||
func (f *FakeExtender) selectVictimsOnNodeByExtender(
|
func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node) ([]*v1.Pod, int, bool, error) {
|
||||||
pod *v1.Pod,
|
|
||||||
node *v1.Node,
|
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
|
||||||
) ([]*v1.Pod, int, bool, error) {
|
|
||||||
// If a extender support preemption but have no cached node info, let's run filter to make sure
|
// If a extender support preemption but have no cached node info, let's run filter to make sure
|
||||||
// default scheduler's decision still stand with given pod and node.
|
// default scheduler's decision still stand with given pod and node.
|
||||||
if !f.nodeCacheCapable {
|
if !f.nodeCacheCapable {
|
||||||
@ -288,7 +285,7 @@ func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {
|
|||||||
return fits, nil
|
return fits, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
|
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
|
||||||
filtered := []*v1.Node{}
|
filtered := []*v1.Node{}
|
||||||
failedNodesMap := extenderv1.FailedNodesMap{}
|
failedNodesMap := extenderv1.FailedNodesMap{}
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
|
@ -44,6 +44,7 @@ import (
|
|||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/listers"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
||||||
@ -305,7 +306,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt
|
|||||||
if !ok || fitError == nil {
|
if !ok || fitError == nil {
|
||||||
return nil, nil, nil, nil
|
return nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
|
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos(), g.enableNonPreempting) {
|
||||||
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
|
||||||
return nil, nil, nil, nil
|
return nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
@ -365,7 +366,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
|
|||||||
newNodeToVictims, err := extender.ProcessPreemption(
|
newNodeToVictims, err := extender.ProcessPreemption(
|
||||||
pod,
|
pod,
|
||||||
nodeToVictims,
|
nodeToVictims,
|
||||||
g.nodeInfoSnapshot.NodeInfoMap,
|
g.nodeInfoSnapshot.NodeInfos(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if extender.IsIgnorable() {
|
if extender.IsIgnorable() {
|
||||||
@ -504,7 +505,7 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
|||||||
if !extender.IsInterested(pod) {
|
if !extender.IsInterested(pod) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
|
filteredList, failedMap, err := extender.Filter(pod, filtered)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if extender.IsIgnorable() {
|
if extender.IsIgnorable() {
|
||||||
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
||||||
@ -1044,14 +1045,14 @@ func nodesWherePreemptionMightHelp(nodes []*schedulernodeinfo.NodeInfo, fitErr *
|
|||||||
// considered for preemption.
|
// considered for preemption.
|
||||||
// We look at the node that is nominated for this pod and as long as there are
|
// We look at the node that is nominated for this pod and as long as there are
|
||||||
// terminating pods on the node, we don't consider this for preempting more pods.
|
// terminating pods on the node, we don't consider this for preempting more pods.
|
||||||
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
|
func podEligibleToPreemptOthers(pod *v1.Pod, nodeInfos listers.NodeInfoLister, enableNonPreempting bool) bool {
|
||||||
if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
|
||||||
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
|
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
nomNodeName := pod.Status.NominatedNodeName
|
nomNodeName := pod.Status.NominatedNodeName
|
||||||
if len(nomNodeName) > 0 {
|
if len(nomNodeName) > 0 {
|
||||||
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
|
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
|
||||||
podPriority := podutil.GetPodPriority(pod)
|
podPriority := podutil.GetPodPriority(pod)
|
||||||
for _, p := range nodeInfo.Pods() {
|
for _, p := range nodeInfo.Pods() {
|
||||||
if p.DeletionTimestamp != nil && podutil.GetPodPriority(p) < podPriority {
|
if p.DeletionTimestamp != nil && podutil.GetPodPriority(p) < podPriority {
|
||||||
|
@ -1604,7 +1604,10 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
|||||||
if !preFilterStatus.IsSuccess() {
|
if !preFilterStatus.IsSuccess() {
|
||||||
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
|
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
|
||||||
}
|
}
|
||||||
nodeInfos := nodesToNodeInfos(nodes, snapshot)
|
nodeInfos, err := nodesToNodeInfos(nodes, snapshot)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
|
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -1834,7 +1837,10 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assignDefaultStartTime(test.pods)
|
assignDefaultStartTime(test.pods)
|
||||||
|
|
||||||
nodeInfos := nodesToNodeInfos(nodes, snapshot)
|
nodeInfos, err := nodesToNodeInfos(nodes, snapshot)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
state := framework.NewCycleState()
|
state := framework.NewCycleState()
|
||||||
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
|
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
|
||||||
node := pickOneNodeForPreemption(candidateNodes)
|
node := pickOneNodeForPreemption(candidateNodes)
|
||||||
@ -2468,10 +2474,14 @@ func TestFairEvaluationForNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodesToNodeInfos(nodes []*v1.Node, snapshot *nodeinfosnapshot.Snapshot) []*schedulernodeinfo.NodeInfo {
|
func nodesToNodeInfos(nodes []*v1.Node, snapshot *nodeinfosnapshot.Snapshot) ([]*schedulernodeinfo.NodeInfo, error) {
|
||||||
var nodeInfos []*schedulernodeinfo.NodeInfo
|
var nodeInfos []*schedulernodeinfo.NodeInfo
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
nodeInfos = append(nodeInfos, snapshot.NodeInfoMap[n.Name])
|
nodeInfo, err := snapshot.NodeInfos().Get(n.Name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nodeInfos = append(nodeInfos, nodeInfo)
|
||||||
}
|
}
|
||||||
return nodeInfos
|
return nodeInfos, nil
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,7 @@ import (
|
|||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/listers"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
||||||
)
|
)
|
||||||
@ -469,7 +470,7 @@ func (f *fakeExtender) IsIgnorable() bool {
|
|||||||
func (f *fakeExtender) ProcessPreemption(
|
func (f *fakeExtender) ProcessPreemption(
|
||||||
pod *v1.Pod,
|
pod *v1.Pod,
|
||||||
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
nodeToVictims map[*v1.Node]*extenderv1.Victims,
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
nodeInfos listers.NodeInfoLister,
|
||||||
) (map[*v1.Node]*extenderv1.Victims, error) {
|
) (map[*v1.Node]*extenderv1.Victims, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -478,11 +479,7 @@ func (f *fakeExtender) SupportsPreemption() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeExtender) Filter(
|
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
|
||||||
pod *v1.Pod,
|
|
||||||
nodes []*v1.Node,
|
|
||||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
|
||||||
) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
|
|
||||||
return nil, nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -786,7 +787,8 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
|
|||||||
if !preFilterStatus.IsSuccess() {
|
if !preFilterStatus.IsSuccess() {
|
||||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||||
}
|
}
|
||||||
gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name])
|
nodeInfo := mustGetNodeInfo(t, snapshot, test.node.Name)
|
||||||
|
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
|
||||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||||
}
|
}
|
||||||
@ -1623,7 +1625,8 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
|
|||||||
if !preFilterStatus.IsSuccess() {
|
if !preFilterStatus.IsSuccess() {
|
||||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||||
}
|
}
|
||||||
gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[node.Name])
|
nodeInfo := mustGetNodeInfo(t, snapshot, node.Name)
|
||||||
|
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
|
||||||
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
|
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
|
||||||
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
|
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
|
||||||
}
|
}
|
||||||
@ -1886,7 +1889,8 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
|
|||||||
originalState := state.Clone()
|
originalState := state.Clone()
|
||||||
|
|
||||||
// Add test.addedPod to state1 and verify it is equal to allPodsState.
|
// Add test.addedPod to state1 and verify it is equal to allPodsState.
|
||||||
if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
|
nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName)
|
||||||
|
if err := ipa.AddPod(context.Background(), cycleState, test.pendingPod, test.addedPod, nodeInfo); err != nil {
|
||||||
t.Errorf("error adding pod to meta: %v", err)
|
t.Errorf("error adding pod to meta: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1895,7 +1899,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove the added pod pod and make sure it is equal to the original state.
|
// Remove the added pod pod and make sure it is equal to the original state.
|
||||||
if err := ipa.RemovePod(context.Background(), cycleState, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
|
if err := ipa.RemovePod(context.Background(), cycleState, test.pendingPod, test.addedPod, nodeInfo); err != nil {
|
||||||
t.Errorf("error removing pod from meta: %v", err)
|
t.Errorf("error removing pod from meta: %v", err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(originalState, state) {
|
if !reflect.DeepEqual(originalState, state) {
|
||||||
@ -2146,3 +2150,12 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustGetNodeInfo(t *testing.T, snapshot *nodeinfosnapshot.Snapshot, name string) *nodeinfo.NodeInfo {
|
||||||
|
t.Helper()
|
||||||
|
nodeInfo, err := snapshot.NodeInfos().Get(name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return nodeInfo
|
||||||
|
}
|
||||||
|
@ -28,6 +28,7 @@ go_test(
|
|||||||
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/listers/fake:go_default_library",
|
"//pkg/scheduler/listers/fake:go_default_library",
|
||||||
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||||
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||||
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
|
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -176,7 +177,8 @@ func TestServiceAffinity(t *testing.T) {
|
|||||||
if s := p.PreFilter(context.Background(), state, test.pod); !s.IsSuccess() {
|
if s := p.PreFilter(context.Background(), state, test.pod); !s.IsSuccess() {
|
||||||
t.Errorf("PreFilter failed: %v", s.Message())
|
t.Errorf("PreFilter failed: %v", s.Message())
|
||||||
}
|
}
|
||||||
status := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name])
|
nodeInfo := mustGetNodeInfo(t, snapshot, test.node.Name)
|
||||||
|
status := p.Filter(context.Background(), state, test.pod, nodeInfo)
|
||||||
if status.Code() != test.res {
|
if status.Code() != test.res {
|
||||||
t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res)
|
t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res)
|
||||||
}
|
}
|
||||||
@ -547,7 +549,8 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
|
|||||||
plStateOriginal, _ := plState.Clone().(*preFilterState)
|
plStateOriginal, _ := plState.Clone().(*preFilterState)
|
||||||
|
|
||||||
// Add test.addedPod to state1 and verify it is equal to allPodsState.
|
// Add test.addedPod to state1 and verify it is equal to allPodsState.
|
||||||
if err := ipa.AddPod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
|
nodeInfo := mustGetNodeInfo(t, snapshot, test.addedPod.Spec.NodeName)
|
||||||
|
if err := ipa.AddPod(context.Background(), state, test.pendingPod, test.addedPod, nodeInfo); err != nil {
|
||||||
t.Errorf("error adding pod to preFilterState: %v", err)
|
t.Errorf("error adding pod to preFilterState: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -556,7 +559,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove the added pod pod and make sure it is equal to the original state.
|
// Remove the added pod pod and make sure it is equal to the original state.
|
||||||
if err := ipa.RemovePod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil {
|
if err := ipa.RemovePod(context.Background(), state, test.pendingPod, test.addedPod, nodeInfo); err != nil {
|
||||||
t.Errorf("error removing pod from preFilterState: %v", err)
|
t.Errorf("error removing pod from preFilterState: %v", err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(sortState(plStateOriginal), sortState(plState)) {
|
if !reflect.DeepEqual(sortState(plStateOriginal), sortState(plState)) {
|
||||||
@ -602,3 +605,12 @@ func sortNodeScoreList(out framework.NodeScoreList) {
|
|||||||
return out[i].Score < out[j].Score
|
return out[i].Score < out[j].Score
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustGetNodeInfo(t *testing.T, snapshot *nodeinfosnapshot.Snapshot, name string) *nodeinfo.NodeInfo {
|
||||||
|
t.Helper()
|
||||||
|
nodeInfo, err := snapshot.NodeInfos().Get(name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return nodeInfo
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user