Merge pull request #110768 from kerthcet/fix/filter-out-unsatisfied-nodes-in-AddPod

Filter out unsatisfied nodes when calling  AddPod in PodTopologySpread
This commit is contained in:
Kubernetes Prow Robot 2022-07-05 10:15:19 -07:00 committed by GitHub
commit a17215fe84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 31 deletions

View File

@ -147,28 +147,6 @@ func (p *criticalPaths) update(tpVal string, num int) {
}
}
func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int) {
if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return
}
if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
return
}
podLabelSet := labels.Set(updatedPod.Labels)
for _, constraint := range s.Constraints {
if !constraint.Selector.Matches(podLabelSet) {
continue
}
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
pair := topologyPair{key: k, value: v}
s.TpPairToMatchNum[pair] += delta
s.TpKeyToCriticalPaths[k].update(v, s.TpPairToMatchNum[pair])
}
}
// PreFilter invoked at the prefilter extension point.
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s, err := pl.calPreFilterState(ctx, pod)
@ -191,7 +169,7 @@ func (pl *PodTopologySpread) AddPod(ctx context.Context, cycleState *framework.C
return framework.AsStatus(err)
}
s.updateWithPod(podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1)
pl.updateWithPod(s, podInfoToAdd.Pod, podToSchedule, nodeInfo.Node(), 1)
return nil
}
@ -202,10 +180,45 @@ func (pl *PodTopologySpread) RemovePod(ctx context.Context, cycleState *framewor
return framework.AsStatus(err)
}
s.updateWithPod(podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1)
pl.updateWithPod(s, podInfoToRemove.Pod, podToSchedule, nodeInfo.Node(), -1)
return nil
}
func (pl *PodTopologySpread) updateWithPod(s *preFilterState, updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int) {
if s == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return
}
if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
return
}
requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(preemptorPod)
if !pl.enableNodeInclusionPolicyInPodTopologySpread {
// spreading is applied to nodes that pass those filters.
// Ignore parsing errors for backwards compatibility.
if match, _ := requiredSchedulingTerm.Match(node); !match {
return
}
}
podLabelSet := labels.Set(updatedPod.Labels)
for _, constraint := range s.Constraints {
if !constraint.Selector.Matches(podLabelSet) {
continue
}
if pl.enableNodeInclusionPolicyInPodTopologySpread &&
!constraint.matchNodeInclusionPolicies(updatedPod, node, requiredSchedulingTerm) {
continue
}
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
pair := topologyPair{key: k, value: v}
s.TpPairToMatchNum[pair] += delta
s.TpKeyToCriticalPaths[k].update(v, s.TpPairToMatchNum[pair])
}
}
// getPreFilterState fetches a pre-computed preFilterState.
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)

View File

@ -19,6 +19,7 @@ package podtopologyspread
import (
"context"
"fmt"
"math"
"reflect"
"testing"
@ -1220,13 +1221,14 @@ func TestPreFilterStateAddPod(t *testing.T) {
zoneConstraint := nodeConstraint
zoneConstraint.TopologyKey = "zone"
tests := []struct {
name string
preemptor *v1.Pod
addedPod *v1.Pod
existingPods []*v1.Pod
nodeIdx int // denotes which node 'addedPod' belongs to
nodes []*v1.Node
want *preFilterState
name string
preemptor *v1.Pod
addedPod *v1.Pod
existingPods []*v1.Pod
nodeIdx int // denotes which node 'addedPod' belongs to
nodes []*v1.Node
want *preFilterState
enableNodeInclustionPolicy bool
}{
{
name: "node a and b both impact current min match",
@ -1501,6 +1503,58 @@ func TestPreFilterStateAddPod(t *testing.T) {
},
},
},
{
name: "add a pod that doesn't match node affinity when NodeInclustionPolicy disabled",
preemptor: st.MakePod().Name("p").Label("foo", "").NodeAffinityNotIn("foo", []string{"bar"}).
SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil).
Obj(),
nodeIdx: 0,
addedPod: st.MakePod().Name("p-a1").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(),
existingPods: []*v1.Pod{
st.MakePod().Name("p-b1").Node("node-a").Label("foo", "").Label("zone", "zone1").Obj(),
st.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Label("foo", "bar").Obj(),
st.MakeNode().Name("node-b").Label("zone", "zone2").Label("node", "node-b").Label("foo", "").Obj(),
},
want: &preFilterState{
Constraints: []topologySpreadConstraint{zoneConstraint},
TpKeyToCriticalPaths: map[string]*criticalPaths{
"zone": {{"zone2", 1}, {MatchNum: math.MaxInt32}},
},
TpPairToMatchNum: map[topologyPair]int{
{key: "zone", value: "zone2"}: 1,
},
},
enableNodeInclustionPolicy: false,
},
{
name: "add a pod that doesn't match node affinity when NodeInclustionPolicy enabled",
preemptor: st.MakePod().Name("p").Label("foo", "").NodeAffinityNotIn("foo", []string{"bar"}).
SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector, nil, nil, nil).
Obj(),
nodeIdx: 0,
addedPod: st.MakePod().Name("p-a1").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(),
existingPods: []*v1.Pod{
st.MakePod().Name("p-b1").Node("node-a").Label("foo", "").Label("zone", "zone1").Obj(),
st.MakePod().Name("p-b2").Node("node-b").Label("foo", "").Label("zone", "zone2").Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Label("foo", "bar").Obj(),
st.MakeNode().Name("node-b").Label("zone", "zone2").Label("node", "node-b").Label("foo", "").Obj(),
},
want: &preFilterState{
Constraints: []topologySpreadConstraint{zoneConstraint},
TpKeyToCriticalPaths: map[string]*criticalPaths{
"zone": {{"zone2", 1}, {MatchNum: math.MaxInt32}},
},
TpPairToMatchNum: map[topologyPair]int{
{key: "zone", value: "zone2"}: 1,
},
},
enableNodeInclustionPolicy: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -1508,6 +1562,8 @@ func TestPreFilterStateAddPod(t *testing.T) {
snapshot := cache.NewSnapshot(tt.existingPods, tt.nodes)
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
p.enableNodeInclusionPolicyInPodTopologySpread = tt.enableNodeInclustionPolicy
cs := framework.NewCycleState()
if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
t.Fatal(s.AsError())