mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #87847 from notpad/feature/slow_path
Cleanup "slow-path" logic in scheduler Filters
This commit is contained in:
commit
574acbe310
@ -1339,7 +1339,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "a pod that fits on both machines when lower priority pods are preempted",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1354,7 +1354,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "a pod that would fit on the machines, but other pods running are higher priority",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1369,7 +1369,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "medium priority pod is preempted, but lower priority one stays as it is small",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1385,7 +1385,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "mixed priority pods are preempted",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1403,7 +1403,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "mixed priority pods are preempted, pick later StartTime one when priorities are equal",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1421,8 +1421,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "pod with anti-affinity is preempted",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterFilterPlugin(interpodaffinity.Name, interpodaffinity.New),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterPluginAsExtensions(interpodaffinity.Name, 1, interpodaffinity.New, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1541,7 +1541,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1557,7 +1557,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
name: "preemption with violation of same pdb",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1"},
|
||||
@ -1673,7 +1673,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "No node needs preemption",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1"},
|
||||
@ -1686,7 +1686,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "a pod that fits on both machines when lower priority pods are preempted",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
@ -1701,7 +1701,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "a pod that fits on a machine with no preemption",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1716,7 +1716,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "machine with min highest priority pod is picked",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1737,7 +1737,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "when highest priorities are the same, minimum sum of priorities is picked",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1758,7 +1758,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "when highest priority and sum are the same, minimum number of pods is picked",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1784,7 +1784,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "sum of adjusted priorities is considered",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1807,7 +1807,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "non-overlapping lowest high priority, sum priorities, and number of pods",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3", "machine4"},
|
||||
@ -1835,7 +1835,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "same priority, same number of victims, different start time for each machine's pod",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1856,7 +1856,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "same priority, same number of victims, different start time for all pods",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1877,7 +1877,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
name: "different priority, same number of victims, different start time for all pods",
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"machine1", "machine2", "machine3"},
|
||||
@ -1918,6 +1918,11 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
state := framework.NewCycleState()
|
||||
// Some tests rely on PreFilter plugin to compute its CycleState.
|
||||
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
|
||||
}
|
||||
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
|
||||
node := pickOneNodeForPreemption(candidateNodes)
|
||||
found := false
|
||||
@ -2091,7 +2096,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine1",
|
||||
@ -2112,7 +2117,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine3",
|
||||
@ -2227,7 +2232,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine1",
|
||||
@ -2253,7 +2258,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "",
|
||||
@ -2283,7 +2288,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine1",
|
||||
@ -2313,7 +2318,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine3",
|
||||
@ -2334,7 +2339,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "",
|
||||
@ -2355,7 +2360,7 @@ func TestPreempt(t *testing.T) {
|
||||
},
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
expectedNode: "machine1",
|
||||
|
@ -388,10 +388,8 @@ func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework
|
||||
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The preFilterState wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
return nil, nil
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(*preFilterState)
|
||||
@ -405,20 +403,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
|
||||
// terms indicated by the existing pods.
|
||||
func (pl *InterPodAffinity) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, state *preFilterState, nodeInfo *nodeinfo.NodeInfo) (bool, error) {
|
||||
node := nodeInfo.Node()
|
||||
var topologyMap topologyToMatchedTermCount
|
||||
if state != nil {
|
||||
topologyMap = state.topologyToMatchedExistingAntiAffinityTerms
|
||||
} else {
|
||||
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
||||
// present in nodeInfo. Pods on other nodes pass the filter.
|
||||
filteredPods, err := pl.sharedLister.Pods().FilteredList(nodeInfo.Filter, labels.Everything())
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to get all pods: %v", err)
|
||||
}
|
||||
if topologyMap, err = pl.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
|
||||
return false, fmt.Errorf("Failed to get all terms that match pod: %v", err)
|
||||
}
|
||||
}
|
||||
topologyMap := state.topologyToMatchedExistingAntiAffinityTerms
|
||||
|
||||
// Iterate over topology pairs to get any of the pods being affected by
|
||||
// the scheduled pod anti-affinity terms
|
||||
@ -462,37 +447,6 @@ func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTerm
|
||||
return false
|
||||
}
|
||||
|
||||
// podMatchesPodAffinityTerms checks if the "targetPod" matches the given "terms"
|
||||
// of the "pod" on the given "nodeInfo".Node(). It returns three values: 1) whether
|
||||
// targetPod matches all the terms and their topologies, 2) whether targetPod
|
||||
// matches all the terms label selector and namespaces 3) any error.
|
||||
func (pl *InterPodAffinity) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, nodeInfo *nodeinfo.NodeInfo, terms []v1.PodAffinityTerm) (bool, bool, error) {
|
||||
if len(terms) == 0 {
|
||||
return false, false, fmt.Errorf("terms array is empty")
|
||||
}
|
||||
props, err := getAffinityTerms(pod, terms)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if !podMatchesAllAffinityTerms(targetPod, props) {
|
||||
return false, false, nil
|
||||
}
|
||||
// Namespace and selector of the terms have matched. Now we check topology of the terms.
|
||||
targetPodNodeInfo, err := pl.sharedLister.NodeInfos().Get(targetPod.Spec.NodeName)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
for _, term := range terms {
|
||||
if len(term.TopologyKey) == 0 {
|
||||
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
|
||||
}
|
||||
if !schedutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo.Node(), term.TopologyKey) {
|
||||
return false, true, nil
|
||||
}
|
||||
}
|
||||
return true, true, nil
|
||||
}
|
||||
|
||||
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
|
||||
// (1) Whether it has PodAntiAffinity
|
||||
// (2) Whether ANY AffinityTerm matches the incoming pod
|
||||
@ -519,24 +473,6 @@ func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.P
|
||||
return topologyMap, nil
|
||||
}
|
||||
|
||||
func (pl *InterPodAffinity) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (topologyToMatchedTermCount, error) {
|
||||
topologyMaps := make(topologyToMatchedTermCount)
|
||||
|
||||
for _, existingPod := range existingPods {
|
||||
existingPodNodeInfo, err := pl.sharedLister.NodeInfos().Get(existingPod.Spec.NodeName)
|
||||
if err != nil {
|
||||
klog.Errorf("Pod %s has NodeName %q but node is not found", existingPod.Name, existingPod.Spec.NodeName)
|
||||
continue
|
||||
}
|
||||
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNodeInfo.Node())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
topologyMaps.append(existingPodTopologyMaps)
|
||||
}
|
||||
return topologyMaps, nil
|
||||
}
|
||||
|
||||
// satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod.
|
||||
// This function returns two boolean flags. The first boolean flag indicates whether the pod matches affinity rules
|
||||
// or not. The second boolean flag indicates if the pod matches anti-affinity rules.
|
||||
@ -547,79 +483,31 @@ func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
|
||||
if node == nil {
|
||||
return false, false, fmt.Errorf("node not found")
|
||||
}
|
||||
if state != nil {
|
||||
// Check all affinity terms.
|
||||
topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms
|
||||
if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
|
||||
matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
|
||||
if !matchExists {
|
||||
// This pod may the first pod in a series that have affinity to themselves. In order
|
||||
// to not leave such pods in pending state forever, we check that if no other pod
|
||||
// in the cluster matches the namespace and selector of this pod and the pod matches
|
||||
// its own terms, then we allow the pod to pass the affinity check.
|
||||
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
|
||||
return false, false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check all anti-affinity terms.
|
||||
topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms
|
||||
if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
|
||||
matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
|
||||
if matchExists {
|
||||
return true, false, nil
|
||||
}
|
||||
}
|
||||
} else { // We don't have precomputed preFilterState. We have to follow a slow path to check affinity terms.
|
||||
filteredPods, err := pl.sharedLister.Pods().FilteredList(nodeInfo.Filter, labels.Everything())
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
||||
affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity)
|
||||
antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
|
||||
matchFound, termsSelectorMatchFound := false, false
|
||||
for _, targetPod := range filteredPods {
|
||||
// Check all affinity terms.
|
||||
if !matchFound && len(affinityTerms) > 0 {
|
||||
affTermsMatch, termsSelectorMatch, err := pl.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, affinityTerms)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if termsSelectorMatch {
|
||||
termsSelectorMatchFound = true
|
||||
}
|
||||
if affTermsMatch {
|
||||
matchFound = true
|
||||
}
|
||||
}
|
||||
|
||||
// Check all anti-affinity terms.
|
||||
if len(antiAffinityTerms) > 0 {
|
||||
antiAffTermsMatch, _, err := pl.podMatchesPodAffinityTerms(pod, targetPod, nodeInfo, antiAffinityTerms)
|
||||
if err != nil || antiAffTermsMatch {
|
||||
return true, false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !matchFound && len(affinityTerms) > 0 {
|
||||
// We have not been able to find any matches for the pod's affinity terms.
|
||||
// This pod may be the first pod in a series that have affinity to themselves. In order
|
||||
// Check all affinity terms.
|
||||
topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms
|
||||
if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
|
||||
matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
|
||||
if !matchExists {
|
||||
// This pod may the first pod in a series that have affinity to themselves. In order
|
||||
// to not leave such pods in pending state forever, we check that if no other pod
|
||||
// in the cluster matches the namespace and selector of this pod and the pod matches
|
||||
// its own terms, then we allow the pod to pass the affinity check.
|
||||
if termsSelectorMatchFound {
|
||||
return false, false, nil
|
||||
}
|
||||
// Check if pod matches its own affinity terms (namespace and label selector).
|
||||
if !targetPodMatchesAffinityOfPod(pod, pod) {
|
||||
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
|
||||
return false, false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check all anti-affinity terms.
|
||||
topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms
|
||||
if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
|
||||
matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
|
||||
if matchExists {
|
||||
return true, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, true, nil
|
||||
}
|
||||
|
||||
|
@ -1635,120 +1635,16 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPreFilterDisabled(t *testing.T) {
|
||||
labelRgChina := map[string]string{
|
||||
"region": "China",
|
||||
}
|
||||
labelRgChinaAzAz1 := map[string]string{
|
||||
"region": "China",
|
||||
"az": "az1",
|
||||
}
|
||||
labelRgIndia := map[string]string{
|
||||
"region": "India",
|
||||
}
|
||||
labelRgUS := map[string]string{
|
||||
"region": "US",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
pod *v1.Pod
|
||||
pods []*v1.Pod
|
||||
nodes []*v1.Node
|
||||
wantStatuses []*framework.Status
|
||||
name string
|
||||
}{
|
||||
{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "123"}},
|
||||
Spec: v1.PodSpec{
|
||||
Affinity: &v1.Affinity{
|
||||
PodAntiAffinity: &v1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "foo",
|
||||
Operator: metav1.LabelSelectorOpIn,
|
||||
Values: []string{"bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
TopologyKey: "region",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: v1.PodSpec{NodeName: "nodeA"}, ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}},
|
||||
{
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "nodeC",
|
||||
Affinity: &v1.Affinity{
|
||||
PodAntiAffinity: &v1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "foo",
|
||||
Operator: metav1.LabelSelectorOpIn,
|
||||
Values: []string{"123"},
|
||||
},
|
||||
},
|
||||
},
|
||||
TopologyKey: "region",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
nodes: []*v1.Node{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: labelRgChina}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: labelRgChinaAzAz1}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: labelRgIndia}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "nodeD", Labels: labelRgUS}},
|
||||
},
|
||||
wantStatuses: []*framework.Status{
|
||||
framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
ErrReasonAffinityNotMatch,
|
||||
ErrReasonAntiAffinityRulesNotMatch,
|
||||
),
|
||||
framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
ErrReasonAffinityNotMatch,
|
||||
ErrReasonAntiAffinityRulesNotMatch,
|
||||
),
|
||||
framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
ErrReasonAffinityNotMatch,
|
||||
ErrReasonExistingAntiAffinityRulesNotMatch,
|
||||
),
|
||||
nil,
|
||||
},
|
||||
name: "NodeA and nodeB have same topologyKey and label value. NodeA has an existing pod that matches the inter pod affinity rule. NodeC has an existing pod that match the inter pod affinity rule. The pod can not be scheduled onto nodeA, nodeB and nodeC but can be schedulerd onto nodeD",
|
||||
},
|
||||
}
|
||||
|
||||
for indexTest, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
for indexNode, node := range test.nodes {
|
||||
p := &InterPodAffinity{
|
||||
sharedLister: snapshot,
|
||||
}
|
||||
state := framework.NewCycleState()
|
||||
nodeInfo := mustGetNodeInfo(t, snapshot, node.Name)
|
||||
gotStatus := p.Filter(context.Background(), state, test.pod, nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
|
||||
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
|
||||
}
|
||||
}
|
||||
})
|
||||
pod := &v1.Pod{}
|
||||
nodeInfo := nodeinfo.NewNodeInfo()
|
||||
node := v1.Node{}
|
||||
nodeInfo.SetNode(&node)
|
||||
p := &InterPodAffinity{}
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo)
|
||||
wantStatus := framework.NewStatus(framework.Error, `error reading "PreFilterInterPodAffinity" from cycleState: not found`)
|
||||
if !reflect.DeepEqual(gotStatus, wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,9 +86,8 @@ func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// preFilterState state doesn't exist. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
return nil, nil
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(preFilterState)
|
||||
@ -105,12 +104,7 @@ func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleStat
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
|
||||
var fits bool
|
||||
if wantPorts != nil {
|
||||
fits = fitsPorts(wantPorts, nodeInfo)
|
||||
} else {
|
||||
fits = Fits(pod, nodeInfo)
|
||||
}
|
||||
fits := fitsPorts(wantPorts, nodeInfo)
|
||||
if !fits {
|
||||
return framework.NewStatus(framework.Unschedulable, ErrReason)
|
||||
}
|
||||
|
@ -164,41 +164,16 @@ func TestNodePorts(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPreFilterDisabled(t *testing.T) {
|
||||
tests := []struct {
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulernodeinfo.NodeInfo
|
||||
name string
|
||||
wantStatus *framework.Status
|
||||
}{
|
||||
{
|
||||
pod: &v1.Pod{},
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(),
|
||||
name: "nothing running",
|
||||
},
|
||||
{
|
||||
pod: newPod("m1", "UDP/127.0.0.1/8080"),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newPod("m1", "UDP/127.0.0.1/9090")),
|
||||
name: "other port",
|
||||
},
|
||||
{
|
||||
pod: newPod("m1", "UDP/127.0.0.1/8080"),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newPod("m1", "UDP/127.0.0.1/8080")),
|
||||
name: "same udp port",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReason),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
p, _ := New(nil, nil)
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
})
|
||||
pod := &v1.Pod{}
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
node := v1.Node{}
|
||||
nodeInfo.SetNode(&node)
|
||||
p, _ := New(nil, nil)
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo)
|
||||
wantStatus := framework.NewStatus(framework.Error, `error reading "PreFilterNodePorts" from cycleState: not found`)
|
||||
if !reflect.DeepEqual(gotStatus, wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
@ -130,10 +129,8 @@ func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The preFilterState doesn't exist. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
return nil, nil
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(*preFilterState)
|
||||
@ -152,12 +149,7 @@ func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
|
||||
var insufficientResources []InsufficientResource
|
||||
if s != nil {
|
||||
insufficientResources = fitsRequest(s, nodeInfo, f.ignoredResources)
|
||||
} else {
|
||||
insufficientResources = Fits(pod, nodeInfo, f.ignoredResources)
|
||||
}
|
||||
insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources)
|
||||
|
||||
if len(insufficientResources) != 0 {
|
||||
// We will keep all failure reasons.
|
||||
|
@ -399,60 +399,16 @@ func TestEnoughRequests(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPreFilterDisabled(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodOverhead, true)()
|
||||
|
||||
tests := []struct {
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulernodeinfo.NodeInfo
|
||||
name string
|
||||
ignoredResources []byte
|
||||
wantInsufficientResources []InsufficientResource
|
||||
wantStatus *framework.Status
|
||||
}{
|
||||
{
|
||||
pod: &v1.Pod{},
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 10, Memory: 20})),
|
||||
name: "no resources requested always fits",
|
||||
wantInsufficientResources: []InsufficientResource{},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1}),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 10, Memory: 20})),
|
||||
name: "too many resources fails",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, getErrReason(v1.ResourceCPU), getErrReason(v1.ResourceMemory)),
|
||||
wantInsufficientResources: []InsufficientResource{{v1.ResourceCPU, getErrReason(v1.ResourceCPU), 1, 10, 10}, {v1.ResourceMemory, getErrReason(v1.ResourceMemory), 1, 20, 20}},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(
|
||||
schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(newResourcePod(schedulernodeinfo.Resource{MilliCPU: 0, Memory: 0})),
|
||||
ignoredResources: []byte(`{"IgnoredResources" : ["example.com/bbb"]}`),
|
||||
name: "skip checking ignored extended resource",
|
||||
wantInsufficientResources: []InsufficientResource{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
args := &runtime.Unknown{Raw: test.ignoredResources}
|
||||
p, _ := NewFit(args, nil)
|
||||
cycleState := framework.NewCycleState()
|
||||
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
|
||||
gotInsufficientResources := Fits(test.pod, test.nodeInfo, p.(*Fit).ignoredResources)
|
||||
if !reflect.DeepEqual(gotInsufficientResources, test.wantInsufficientResources) {
|
||||
t.Errorf("insufficient resources do not match: %v, want: %v", gotInsufficientResources, test.wantInsufficientResources)
|
||||
}
|
||||
})
|
||||
pod := &v1.Pod{}
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
node := v1.Node{}
|
||||
nodeInfo.SetNode(&node)
|
||||
p, _ := NewFit(nil, nil)
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, pod, nodeInfo)
|
||||
wantStatus := framework.NewStatus(framework.Error, `error reading "PreFilterNodeResourcesFit" from cycleState: not found`)
|
||||
if !reflect.DeepEqual(gotStatus, wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ go_test(
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -194,10 +194,8 @@ func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framewor
|
||||
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The preFilterState wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// we are able to handle that by computing it again (e.g. in Filter()).
|
||||
klog.V(5).Infof("Error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
return nil, nil
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
}
|
||||
|
||||
s, ok := c.(*preFilterState)
|
||||
@ -292,11 +290,6 @@ func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.C
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
// nil preFilterState is illegal.
|
||||
if s == nil {
|
||||
// TODO(autoscaler): get it implemented.
|
||||
return framework.NewStatus(framework.Error, "preFilterState not pre-computed for PodTopologySpread Plugin")
|
||||
}
|
||||
|
||||
// However, "empty" preFilterState is legit which tolerates every toSchedule Pod.
|
||||
if len(s.tpPairToMatchNum) == 0 || len(s.constraints) == 0 {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
@ -1464,3 +1465,17 @@ func TestMultipleConstraints(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreFilterDisabled(t *testing.T) {
|
||||
pod := &v1.Pod{}
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
node := v1.Node{}
|
||||
nodeInfo.SetNode(&node)
|
||||
p := &PodTopologySpread{}
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo)
|
||||
wantStatus := framework.NewStatus(framework.Error, `error reading "PreFilterPodTopologySpread" from cycleState: not found`)
|
||||
if !reflect.DeepEqual(gotStatus, wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
@ -154,7 +153,7 @@ func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.Cyc
|
||||
|
||||
// If addedPod is in the same namespace as the pod, update the list
|
||||
// of matching pods if applicable.
|
||||
if s == nil || podToAdd.Namespace != podToSchedule.Namespace {
|
||||
if podToAdd.Namespace != podToSchedule.Namespace {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -173,8 +172,7 @@ func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
|
||||
if s == nil ||
|
||||
len(s.matchingPodList) == 0 ||
|
||||
if len(s.matchingPodList) == 0 ||
|
||||
podToRemove.Namespace != s.matchingPodList[0].Namespace {
|
||||
return nil
|
||||
}
|
||||
@ -192,10 +190,8 @@ func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.
|
||||
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.V(5).Infof(fmt.Sprintf("reading %q from cycleState: %v", preFilterStateKey, err))
|
||||
return nil, nil
|
||||
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
||||
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
}
|
||||
|
||||
if c == nil {
|
||||
@ -247,14 +243,6 @@ func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.Cyc
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
if s == nil {
|
||||
// Make the filter resilient in case preFilterState is missing.
|
||||
s, err = pl.createPreFilterState(pod)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
pods, services := s.matchingPodList, s.matchingPodServices
|
||||
filteredPods := nodeInfo.FilterOutPods(pods)
|
||||
|
@ -599,3 +599,21 @@ func mustGetNodeInfo(t *testing.T, snapshot *cache.Snapshot, name string) *nodei
|
||||
}
|
||||
return nodeInfo
|
||||
}
|
||||
|
||||
func TestPreFilterDisabled(t *testing.T) {
|
||||
pod := &v1.Pod{}
|
||||
nodeInfo := nodeinfo.NewNodeInfo()
|
||||
node := v1.Node{}
|
||||
nodeInfo.SetNode(&node)
|
||||
p := &ServiceAffinity{
|
||||
args: Args{
|
||||
AffinityLabels: []string{"region"},
|
||||
},
|
||||
}
|
||||
cycleState := framework.NewCycleState()
|
||||
gotStatus := p.Filter(context.Background(), cycleState, pod, nodeInfo)
|
||||
wantStatus := framework.NewStatus(framework.Error, `error reading "PreFilterServiceAffinity" from cycleState: not found`)
|
||||
if !reflect.DeepEqual(gotStatus, wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, wantStatus)
|
||||
}
|
||||
}
|
||||
|
@ -569,7 +569,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
fns := []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, 1, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
}
|
||||
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user