From 5a358f478b396bcaa721f891f62422e210db3288 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Fri, 1 Jul 2022 11:46:26 +0800 Subject: [PATCH] Fix: filter out unsatisfied nodes when calling AddPod in PodTopologySpread Signed-off-by: kerthcet --- .../plugins/podtopologyspread/filtering.go | 61 +++++++++------- .../podtopologyspread/filtering_test.go | 70 +++++++++++++++++-- 2 files changed, 100 insertions(+), 31 deletions(-) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go index 888750d03a0..8e2f03225fd 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering.go @@ -147,28 +147,6 @@ func (p *criticalPaths) update(tpVal string, num int) { } } -func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int) { - if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil { - return - } - if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) { - return - } - - podLabelSet := labels.Set(updatedPod.Labels) - for _, constraint := range s.Constraints { - if !constraint.Selector.Matches(podLabelSet) { - continue - } - - k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey] - pair := topologyPair{key: k, value: v} - s.TpPairToMatchNum[pair] += delta - - s.TpKeyToCriticalPaths[k].update(v, s.TpPairToMatchNum[pair]) - } -} - // PreFilter invoked at the prefilter extension point. func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { s, err := pl.calPreFilterState(ctx, pod) @@ -191,7 +169,7 @@ func (pl *PodTopologySpread) AddPod(ctx context.Context, cycleState *framework.C return framework.AsStatus(err) } - s.updateWithPod(podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1) + pl.updateWithPod(s, podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1) return nil } @@ -202,10 +180,45 @@ func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framewor return framework.AsStatus(err) } - s.updateWithPod(podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1) + pl.updateWithPod(s, podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1) return nil } +func (pl *PodTopologySpread) updateWithPod(s *preFilterState, updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int) { + if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil { + return + } + if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) { + return + } + + requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(preemptorPod) + if !pl.enableNodeInclusionPolicyInPodTopologySpread { + // spreading is applied to nodes that pass those filters. + // Ignore parsing errors for backwards compatibility. + if match, _ := requiredSchedulingTerm.Match(node); !match { + return + } + } + + podLabelSet := labels.Set(updatedPod.Labels) + for _, constraint := range s.Constraints { + if !constraint.Selector.Matches(podLabelSet) { + continue + } + if pl.enableNodeInclusionPolicyInPodTopologySpread && + !constraint.matchNodeInclusionPolicies(updatedPod, node, requiredSchedulingTerm) { + continue + } + + k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey] + pair := topologyPair{key: k, value: v} + s.TpPairToMatchNum[pair] += delta + + s.TpKeyToCriticalPaths[k].update(v, s.TpPairToMatchNum[pair]) + } +} + // getPreFilterState fetches a pre-computed preFilterState. func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { c, err := cycleState.Read(preFilterStateKey) diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go index 770b36089de..0184df35090 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go @@ -19,6 +19,7 @@ package podtopologyspread import ( "context" "fmt" + "math" "reflect" "testing" @@ -1220,13 +1221,14 @@ func TestPreFilterStateAddPod(t *testing.T) { zoneConstraint := nodeConstraint zoneConstraint.TopologyKey = "zone" tests := []struct { - name string - preemptor *v1.Pod - addedPod *v1.Pod - existingPods []*v1.Pod - nodeIdx int // denotes which node 'addedPod' belongs to - nodes []*v1.Node - want *preFilterState + name string + preemptor *v1.Pod + addedPod *v1.Pod + existingPods []*v1.Pod + nodeIdx int // denotes which node 'addedPod' belongs to + nodes []*v1.Node + want *preFilterState + enableNodeInclustionPolicy bool }{ { name: "node a and b both impact current min match", @@ -1501,6 +1503,58 @@ func TestPreFilterStateAddPod(t *testing.T) { }, }, }, + { + name: "add a pod that doesn't match node affinity when NodeInclustionPolicy disabled", + preemptor: st.MakePod().Name("p").Label("foo", "").NodeAffinityNotIn("foo", []string{"bar"}). + SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil). + Obj(), + nodeIdx: 0, + addedPod: st.MakePod().Name("p-a1").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p-b1").Node("node-a").Label("foo", "").Label("zone", "zone1").Obj(), + st.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Label("foo", "bar").Obj(), + st.MakeNode().Name("node-b").Label("zone", "zone2").Label("node", "node-b").Label("foo", "").Obj(), + }, + want: &preFilterState{ + Constraints: []topologySpreadConstraint{zoneConstraint}, + TpKeyToCriticalPaths: map[string]*criticalPaths{ + "zone": {{"zone2", 1}, {MatchNum: math.MaxInt32}}, + }, + TpPairToMatchNum: map[topologyPair]int{ + {key: "zone", value: "zone2"}: 1, + }, + }, + enableNodeInclustionPolicy: false, + }, + { + name: "add a pod that doesn't match node affinity when NodeInclustionPolicy enabled", + preemptor: st.MakePod().Name("p").Label("foo", "").NodeAffinityNotIn("foo", []string{"bar"}). + SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil). + Obj(), + nodeIdx: 0, + addedPod: st.MakePod().Name("p-a1").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p-b1").Node("node-a").Label("foo", "").Label("zone", "zone1").Obj(), + st.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(), + }, + nodes: []*v1.Node{ + st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Label("foo", "bar").Obj(), + st.MakeNode().Name("node-b").Label("zone", "zone2").Label("node", "node-b").Label("foo", "").Obj(), + }, + want: &preFilterState{ + Constraints: []topologySpreadConstraint{zoneConstraint}, + TpKeyToCriticalPaths: map[string]*criticalPaths{ + "zone": {{"zone2", 1}, {MatchNum: math.MaxInt32}}, + }, + TpPairToMatchNum: map[topologyPair]int{ + {key: "zone", value: "zone2"}: 1, + }, + }, + enableNodeInclustionPolicy: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1508,6 +1562,8 @@ func TestPreFilterStateAddPod(t *testing.T) { snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes) pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) p := pl.(*PodTopologySpread) + p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclustionPolicy + cs := framework.NewCycleState() if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() { t.Fatal(s.AsError())