mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #85157 from alculquicondor/refactor/selector
Store topology spread constraints in metadata with labels.Selector
This commit is contained in:
commit
209c025144
@ -116,6 +116,7 @@ func (paths *criticalPaths) update(tpVal string, num int32) {
|
||||
// (1) critical paths where the least pods are matched on each spread constraint.
|
||||
// (2) number of pods matched on each spread constraint.
|
||||
type evenPodsSpreadMetadata struct {
|
||||
constraints []topologySpreadConstraint
|
||||
// We record 2 critical paths instead of all critical paths here.
|
||||
// criticalPaths[0].matchNum always holds the minimum matching number.
|
||||
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
|
||||
@ -125,6 +126,15 @@ type evenPodsSpreadMetadata struct {
|
||||
tpPairToMatchNum map[topologyPair]int32
|
||||
}
|
||||
|
||||
// topologySpreadConstraint is an internal version for a hard (DoNotSchedule
|
||||
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
|
||||
// selector is parsed.
|
||||
type topologySpreadConstraint struct {
|
||||
maxSkew int32
|
||||
topologyKey string
|
||||
selector labels.Selector
|
||||
}
|
||||
|
||||
type serviceAffinityMetadata struct {
|
||||
matchingPodList []*v1.Pod
|
||||
matchingPodServices []*v1.Service
|
||||
@ -420,17 +430,20 @@ func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo,
|
||||
func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*evenPodsSpreadMetadata, error) {
|
||||
// We have feature gating in APIServer to strip the spec
|
||||
// so don't need to re-check feature gate, just check length of constraints.
|
||||
constraints := getHardTopologySpreadConstraints(pod)
|
||||
constraints, err := filterHardTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(constraints) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
var lock sync.Mutex
|
||||
|
||||
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
|
||||
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
|
||||
m := evenPodsSpreadMetadata{
|
||||
constraints: constraints,
|
||||
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
|
||||
tpPairToMatchNum: make(map[topologyPair]int32),
|
||||
}
|
||||
@ -440,8 +453,6 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
|
||||
lock.Unlock()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
processNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
node := nodeInfo.Node()
|
||||
@ -466,28 +477,19 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
|
||||
if existingPod.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
ok, err := PodMatchesSpreadConstraint(existingPod.Labels, constraint)
|
||||
if err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
}
|
||||
if ok {
|
||||
if constraint.selector.Matches(labels.Set(existingPod.Labels)) {
|
||||
matchTotal++
|
||||
}
|
||||
}
|
||||
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
|
||||
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
|
||||
addTopologyPairMatchNum(pair, matchTotal)
|
||||
}
|
||||
}
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
|
||||
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
|
||||
|
||||
// calculate min match for each topology pair
|
||||
for i := 0; i < len(constraints); i++ {
|
||||
key := constraints[i].TopologyKey
|
||||
key := constraints[i].topologyKey
|
||||
m.tpKeyToCriticalPaths[key] = newCriticalPaths()
|
||||
}
|
||||
for pair, num := range m.tpPairToMatchNum {
|
||||
@ -497,36 +499,28 @@ func getEvenPodsSpreadMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeIn
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
|
||||
if pod != nil {
|
||||
for _, constraint := range pod.Spec.TopologySpreadConstraints {
|
||||
if constraint.WhenUnsatisfiable == v1.DoNotSchedule {
|
||||
constraints = append(constraints, constraint)
|
||||
func filterHardTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
|
||||
var result []topologySpreadConstraint
|
||||
for _, c := range constraints {
|
||||
if c.WhenUnsatisfiable == v1.DoNotSchedule {
|
||||
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, topologySpreadConstraint{
|
||||
maxSkew: c.MaxSkew,
|
||||
topologyKey: c.TopologyKey,
|
||||
selector: selector,
|
||||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PodMatchesSpreadConstraint verifies if <constraint.LabelSelector> matches <podLabelSet>.
|
||||
// Some corner cases:
|
||||
// 1. podLabelSet = nil => returns (false, nil)
|
||||
// 2. constraint.LabelSelector = nil => returns (false, nil)
|
||||
func PodMatchesSpreadConstraint(podLabelSet labels.Set, constraint v1.TopologySpreadConstraint) (bool, error) {
|
||||
selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !selector.Matches(podLabelSet) {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// NodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
|
||||
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []v1.TopologySpreadConstraint) bool {
|
||||
for _, constraint := range constraints {
|
||||
if _, ok := nodeLabels[constraint.TopologyKey]; !ok {
|
||||
func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
|
||||
for _, c := range constraints {
|
||||
if _, ok := nodeLabels[c.topologyKey]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -581,57 +575,55 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps {
|
||||
return copy
|
||||
}
|
||||
|
||||
func (c *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
|
||||
return c.updatePod(addedPod, preemptorPod, node, 1)
|
||||
func (m *evenPodsSpreadMetadata) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) {
|
||||
m.updatePod(addedPod, preemptorPod, node, 1)
|
||||
}
|
||||
|
||||
func (c *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) error {
|
||||
return c.updatePod(deletedPod, preemptorPod, node, -1)
|
||||
func (m *evenPodsSpreadMetadata) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) {
|
||||
m.updatePod(deletedPod, preemptorPod, node, -1)
|
||||
}
|
||||
|
||||
func (c *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
|
||||
if updatedPod.Namespace != preemptorPod.Namespace || node == nil {
|
||||
return nil
|
||||
func (m *evenPodsSpreadMetadata) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) {
|
||||
if m == nil || updatedPod.Namespace != preemptorPod.Namespace || node == nil {
|
||||
return
|
||||
}
|
||||
constraints := getHardTopologySpreadConstraints(preemptorPod)
|
||||
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
|
||||
return nil
|
||||
if !NodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
|
||||
return
|
||||
}
|
||||
|
||||
podLabelSet := labels.Set(updatedPod.Labels)
|
||||
for _, constraint := range constraints {
|
||||
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
|
||||
return err
|
||||
} else if !match {
|
||||
for _, constraint := range m.constraints {
|
||||
if !constraint.selector.Matches(podLabelSet) {
|
||||
continue
|
||||
}
|
||||
|
||||
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
|
||||
k, v := constraint.topologyKey, node.Labels[constraint.topologyKey]
|
||||
pair := topologyPair{key: k, value: v}
|
||||
c.tpPairToMatchNum[pair] = c.tpPairToMatchNum[pair] + delta
|
||||
m.tpPairToMatchNum[pair] = m.tpPairToMatchNum[pair] + delta
|
||||
|
||||
c.tpKeyToCriticalPaths[k].update(v, c.tpPairToMatchNum[pair])
|
||||
m.tpKeyToCriticalPaths[k].update(v, m.tpPairToMatchNum[pair])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
|
||||
func (m *evenPodsSpreadMetadata) clone() *evenPodsSpreadMetadata {
|
||||
// c could be nil when EvenPodsSpread feature is disabled
|
||||
if c == nil {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
copy := evenPodsSpreadMetadata{
|
||||
tpKeyToCriticalPaths: make(map[string]*criticalPaths),
|
||||
tpPairToMatchNum: make(map[topologyPair]int32),
|
||||
cp := evenPodsSpreadMetadata{
|
||||
// constraints are shared because they don't change.
|
||||
constraints: m.constraints,
|
||||
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(m.tpKeyToCriticalPaths)),
|
||||
tpPairToMatchNum: make(map[topologyPair]int32, len(m.tpPairToMatchNum)),
|
||||
}
|
||||
for tpKey, paths := range c.tpKeyToCriticalPaths {
|
||||
copy.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
|
||||
for tpKey, paths := range m.tpKeyToCriticalPaths {
|
||||
cp.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
|
||||
}
|
||||
for tpPair, matchNum := range c.tpPairToMatchNum {
|
||||
for tpPair, matchNum := range m.tpPairToMatchNum {
|
||||
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
|
||||
copy.tpPairToMatchNum[copyPair] = matchNum
|
||||
cp.tpPairToMatchNum[copyPair] = matchNum
|
||||
}
|
||||
return ©
|
||||
return &cp
|
||||
}
|
||||
|
||||
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
|
||||
@ -642,10 +634,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro
|
||||
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
||||
}
|
||||
meta.podAffinityMetadata.removePod(deletedPod)
|
||||
// Delete pod from the pod spread topology maps.
|
||||
if err := meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node); err != nil {
|
||||
return err
|
||||
}
|
||||
meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node)
|
||||
meta.serviceAffinityMetadata.removePod(deletedPod, node)
|
||||
|
||||
return nil
|
||||
@ -667,9 +656,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
|
||||
}
|
||||
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints
|
||||
// and addedPod matches that
|
||||
if err := meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node); err != nil {
|
||||
return err
|
||||
}
|
||||
meta.evenPodsSpreadMetadata.addPod(addedPod, meta.pod, node)
|
||||
|
||||
meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node)
|
||||
|
||||
|
@ -367,7 +367,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
|
||||
// are given to the metadata producer.
|
||||
allPodsMeta, _ := getMeta(allPodLister)
|
||||
// existingPodsMeta1 is meta data produced for test.existingPods (without test.addedPod).
|
||||
existingPodsMeta1, nodeInfoMap := getMeta(fakelisters.PodLister(test.existingPods))
|
||||
existingPodsMeta1, nodeInfoMap := getMeta(test.existingPods)
|
||||
// Add test.addedPod to existingPodsMeta1 and make sure meta is equal to allPodsMeta
|
||||
nodeInfo := nodeInfoMap[test.addedPod.Spec.NodeName]
|
||||
if err := existingPodsMeta1.AddPod(test.addedPod, nodeInfo.Node()); err != nil {
|
||||
@ -803,96 +803,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodMatchesSpreadConstraint(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
podLabels map[string]string
|
||||
constraint v1.TopologySpreadConstraint
|
||||
want bool
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "normal match",
|
||||
podLabels: map[string]string{"foo": "", "bar": ""},
|
||||
constraint: v1.TopologySpreadConstraint{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "foo",
|
||||
Operator: metav1.LabelSelectorOpExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "normal mismatch",
|
||||
podLabels: map[string]string{"foo": "", "baz": ""},
|
||||
constraint: v1.TopologySpreadConstraint{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "foo",
|
||||
Operator: metav1.LabelSelectorOpExists,
|
||||
},
|
||||
{
|
||||
Key: "bar",
|
||||
Operator: metav1.LabelSelectorOpExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "podLabels is nil",
|
||||
constraint: v1.TopologySpreadConstraint{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchExpressions: []metav1.LabelSelectorRequirement{
|
||||
{
|
||||
Key: "foo",
|
||||
Operator: metav1.LabelSelectorOpExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "constraint.LabelSelector is nil",
|
||||
podLabels: map[string]string{
|
||||
"foo": "",
|
||||
"bar": "",
|
||||
},
|
||||
constraint: v1.TopologySpreadConstraint{
|
||||
MaxSkew: 1,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "both podLabels and constraint.LabelSelector are nil",
|
||||
constraint: v1.TopologySpreadConstraint{
|
||||
MaxSkew: 1,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
podLabelSet := labels.Set(tt.podLabels)
|
||||
got, err := PodMatchesSpreadConstraint(podLabelSet, tt.constraint)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("PodMatchesSpreadConstraint() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("PodMatchesSpreadConstraint() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
fooSelector := st.MakeLabelSelector().Exists("foo").Obj()
|
||||
barSelector := st.MakeLabelSelector().Exists("bar").Obj()
|
||||
tests := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
@ -903,7 +816,7 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "clean cluster with one spreadConstraint",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").SpreadConstraint(
|
||||
1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj(),
|
||||
5, "zone", hardSpread, st.MakeLabelSelector().Label("foo", "bar").Obj(),
|
||||
).Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -912,6 +825,13 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 5,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, st.MakeLabelSelector().Label("foo", "bar").Obj()),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 0}, {"zone2", 0}},
|
||||
},
|
||||
@ -924,7 +844,7 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "normal case with one spreadConstraint",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").SpreadConstraint(
|
||||
1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj(),
|
||||
1, "zone", hardSpread, fooSelector,
|
||||
).Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -940,6 +860,13 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 2}, {"zone1", 3}},
|
||||
},
|
||||
@ -970,6 +897,13 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone3", 0}, {"zone2", 2}},
|
||||
},
|
||||
@ -983,7 +917,7 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "namespace mismatch doesn't count",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").SpreadConstraint(
|
||||
1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj(),
|
||||
1, "zone", hardSpread, fooSelector,
|
||||
).Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -999,6 +933,13 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 1}, {"zone1", 2}},
|
||||
},
|
||||
@ -1011,8 +952,8 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "normal case with two spreadConstraints",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").
|
||||
SpreadConstraint(1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "node", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "zone", hardSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", hardSpread, fooSelector).
|
||||
Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -1030,6 +971,18 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 3}, {"zone2", 4}},
|
||||
"node": {{"node-x", 0}, {"node-b", 1}},
|
||||
@ -1047,10 +1000,10 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "soft spreadConstraints should be bypassed",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").
|
||||
SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "zone", softSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "node", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "zone", softSpread, fooSelector).
|
||||
SpreadConstraint(1, "zone", hardSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", softSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", hardSpread, fooSelector).
|
||||
Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -1067,6 +1020,18 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 3}, {"zone2", 4}},
|
||||
"node": {{"node-b", 1}, {"node-a", 2}},
|
||||
@ -1083,8 +1048,8 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
{
|
||||
name: "different labelSelectors - simple version",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").Label("bar", "").
|
||||
SpreadConstraint(1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "node", hardSpread, st.MakeLabelSelector().Exists("bar").Obj()).
|
||||
SpreadConstraint(1, "zone", hardSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", hardSpread, barSelector).
|
||||
Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -1096,6 +1061,18 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-b").Node("node-b").Label("bar", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, barSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 0}, {"zone1", 1}},
|
||||
"node": {{"node-a", 0}, {"node-y", 0}},
|
||||
@ -1110,10 +1087,10 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "different labelSelectors - complex version",
|
||||
name: "different labelSelectors - complex pods",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").Label("bar", "").
|
||||
SpreadConstraint(1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "node", hardSpread, st.MakeLabelSelector().Exists("bar").Obj()).
|
||||
SpreadConstraint(1, "zone", hardSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", hardSpread, barSelector).
|
||||
Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -1130,6 +1107,18 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Label("bar", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, barSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 3}, {"zone2", 4}},
|
||||
"node": {{"node-b", 0}, {"node-a", 1}},
|
||||
@ -1147,8 +1136,8 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
name: "two spreadConstraints, and with podAffinity",
|
||||
pod: st.MakePod().Name("p").Label("foo", "").
|
||||
NodeAffinityNotIn("node", []string{"node-x"}). // exclude node-x
|
||||
SpreadConstraint(1, "zone", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "node", hardSpread, st.MakeLabelSelector().Exists("foo").Obj()).
|
||||
SpreadConstraint(1, "zone", hardSpread, fooSelector).
|
||||
SpreadConstraint(1, "node", hardSpread, fooSelector).
|
||||
Obj(),
|
||||
nodes: []*v1.Node{
|
||||
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
|
||||
@ -1166,6 +1155,18 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "zone",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 3}, {"zone2", 4}},
|
||||
"node": {{"node-b", 1}, {"node-a", 2}},
|
||||
@ -1187,13 +1188,20 @@ func TestGetTPMapMatchingSpreadConstraints(t *testing.T) {
|
||||
got, _ := getEvenPodsSpreadMetadata(tt.pod, l)
|
||||
got.sortCriticalPaths()
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("getEvenPodsSpreadMetadata() = %v, want %v", *got, *tt.want)
|
||||
t.Errorf("getEvenPodsSpreadMetadata() = %#v, want %#v", *got, *tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
nodeConstraint := topologySpreadConstraint{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, st.MakeLabelSelector().Exists("foo").Obj()),
|
||||
}
|
||||
zoneConstraint := nodeConstraint
|
||||
zoneConstraint.topologyKey = "zone"
|
||||
tests := []struct {
|
||||
name string
|
||||
preemptor *v1.Pod
|
||||
@ -1216,6 +1224,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"node": {{"node-b", 0}, {"node-a", 1}},
|
||||
},
|
||||
@ -1240,6 +1249,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"node": {{"node-a", 1}, {"node-b", 1}},
|
||||
},
|
||||
@ -1264,6 +1274,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"node": {{"node-a", 0}, {"node-b", 1}},
|
||||
},
|
||||
@ -1288,6 +1299,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"node": {{"node-a", 0}, {"node-b", 2}},
|
||||
},
|
||||
@ -1311,6 +1323,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint, nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 0}, {"zone1", 1}},
|
||||
"node": {{"node-x", 0}, {"node-a", 1}},
|
||||
@ -1339,6 +1352,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint, nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 1}, {"zone2", 1}},
|
||||
"node": {{"node-a", 1}, {"node-x", 1}},
|
||||
@ -1370,6 +1384,7 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint, nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 1}, {"zone1", 3}},
|
||||
"node": {{"node-a", 1}, {"node-x", 1}},
|
||||
@ -1402,6 +1417,14 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
zoneConstraint,
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, st.MakeLabelSelector().Exists("bar").Obj()),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 1}, {"zone1", 2}},
|
||||
"node": {{"node-a", 0}, {"node-b", 1}},
|
||||
@ -1434,6 +1457,14 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
|
||||
},
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{
|
||||
zoneConstraint,
|
||||
{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, st.MakeLabelSelector().Exists("bar").Obj()),
|
||||
},
|
||||
},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 1}, {"zone2", 1}},
|
||||
"node": {{"node-a", 1}, {"node-b", 1}},
|
||||
@ -1463,6 +1494,13 @@ func TestPodSpreadCache_addPod(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
nodeConstraint := topologySpreadConstraint{
|
||||
maxSkew: 1,
|
||||
topologyKey: "node",
|
||||
selector: mustConvertLabelSelectorAsSelector(t, st.MakeLabelSelector().Exists("foo").Obj()),
|
||||
}
|
||||
zoneConstraint := nodeConstraint
|
||||
zoneConstraint.topologyKey = "zone"
|
||||
tests := []struct {
|
||||
name string
|
||||
preemptor *v1.Pod // preemptor pod
|
||||
@ -1493,6 +1531,7 @@ func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
deletedPodIdx: 0, // remove pod "p-a1"
|
||||
nodeIdx: 0, // node-a
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 1}, {"zone2", 1}},
|
||||
},
|
||||
@ -1522,6 +1561,7 @@ func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
deletedPodIdx: 0, // remove pod "p-a1"
|
||||
nodeIdx: 0, // node-a
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 1}, {"zone2", 2}},
|
||||
},
|
||||
@ -1552,6 +1592,7 @@ func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
deletedPodIdx: 0, // remove pod "p-a0"
|
||||
nodeIdx: 0, // node-a
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 2}, {"zone2", 2}},
|
||||
},
|
||||
@ -1582,6 +1623,7 @@ func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
deletedPod: st.MakePod().Name("p-a0").Node("node-a").Label("bar", "").Obj(),
|
||||
nodeIdx: 0, // node-a
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone1", 2}, {"zone2", 2}},
|
||||
},
|
||||
@ -1612,6 +1654,7 @@ func TestPodSpreadCache_removePod(t *testing.T) {
|
||||
deletedPodIdx: 3, // remove pod "p-x1"
|
||||
nodeIdx: 2, // node-x
|
||||
want: &evenPodsSpreadMetadata{
|
||||
constraints: []topologySpreadConstraint{zoneConstraint, nodeConstraint},
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"zone": {{"zone2", 1}, {"zone1", 3}},
|
||||
"node": {{"node-b", 1}, {"node-x", 1}},
|
||||
@ -1703,8 +1746,8 @@ var (
|
||||
)
|
||||
|
||||
// sortCriticalPaths is only served for testing purpose.
|
||||
func (c *evenPodsSpreadMetadata) sortCriticalPaths() {
|
||||
for _, paths := range c.tpKeyToCriticalPaths {
|
||||
func (m *evenPodsSpreadMetadata) sortCriticalPaths() {
|
||||
for _, paths := range m.tpKeyToCriticalPaths {
|
||||
// If two paths both hold minimum matching number, and topologyValue is unordered.
|
||||
if paths[0].matchNum == paths[1].matchNum && paths[0].topologyValue > paths[1].topologyValue {
|
||||
// Swap topologyValue to make them sorted alphabetically.
|
||||
@ -1712,3 +1755,12 @@ func (c *evenPodsSpreadMetadata) sortCriticalPaths() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mustConvertLabelSelectorAsSelector(t *testing.T, ls *metav1.LabelSelector) labels.Selector {
|
||||
t.Helper()
|
||||
s, err := metav1.LabelSelectorAsSelector(ls)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
@ -1642,55 +1642,47 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernode
|
||||
if node == nil {
|
||||
return false, nil, fmt.Errorf("node not found")
|
||||
}
|
||||
constraints := getHardTopologySpreadConstraints(pod)
|
||||
if len(constraints) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
var evenPodsSpreadMetadata *evenPodsSpreadMetadata
|
||||
var epsMeta *evenPodsSpreadMetadata
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||
evenPodsSpreadMetadata = predicateMeta.evenPodsSpreadMetadata
|
||||
epsMeta = predicateMeta.evenPodsSpreadMetadata
|
||||
} else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints.
|
||||
// TODO(autoscaler): get it implemented
|
||||
return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate")
|
||||
}
|
||||
|
||||
if evenPodsSpreadMetadata == nil || len(evenPodsSpreadMetadata.tpPairToMatchNum) == 0 {
|
||||
if epsMeta == nil || len(epsMeta.tpPairToMatchNum) == 0 || len(epsMeta.constraints) == 0 {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
podLabelSet := labels.Set(pod.Labels)
|
||||
for _, constraint := range constraints {
|
||||
tpKey := constraint.TopologyKey
|
||||
tpVal, ok := node.Labels[constraint.TopologyKey]
|
||||
for _, c := range epsMeta.constraints {
|
||||
tpKey := c.topologyKey
|
||||
tpVal, ok := node.Labels[c.topologyKey]
|
||||
if !ok {
|
||||
klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
|
||||
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
|
||||
}
|
||||
|
||||
selfMatch, err := PodMatchesSpreadConstraint(podLabelSet, constraint)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
selfMatchNum := int32(0)
|
||||
if selfMatch {
|
||||
if c.selector.Matches(podLabelSet) {
|
||||
selfMatchNum = 1
|
||||
}
|
||||
|
||||
pair := topologyPair{key: tpKey, value: tpVal}
|
||||
paths, ok := evenPodsSpreadMetadata.tpKeyToCriticalPaths[tpKey]
|
||||
paths, ok := epsMeta.tpKeyToCriticalPaths[tpKey]
|
||||
if !ok {
|
||||
// error which should not happen
|
||||
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, evenPodsSpreadMetadata.tpKeyToCriticalPaths)
|
||||
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, epsMeta.tpKeyToCriticalPaths)
|
||||
continue
|
||||
}
|
||||
// judging criteria:
|
||||
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
|
||||
minMatchNum := paths[0].matchNum
|
||||
matchNum := evenPodsSpreadMetadata.tpPairToMatchNum[pair]
|
||||
matchNum := epsMeta.tpPairToMatchNum[pair]
|
||||
skew := matchNum + selfMatchNum - minMatchNum
|
||||
if skew > constraint.MaxSkew {
|
||||
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, constraint.MaxSkew)
|
||||
if skew > c.maxSkew {
|
||||
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.maxSkew)
|
||||
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
|
||||
}
|
||||
}
|
||||
|
@ -23,12 +23,13 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
|
||||
"k8s.io/klog"
|
||||
)
|
||||
@ -39,12 +40,21 @@ type topologyPair struct {
|
||||
}
|
||||
|
||||
type podTopologySpreadMap struct {
|
||||
constraints []topologySpreadConstraint
|
||||
// nodeNameSet is a string set holding all node names which have all constraints[*].topologyKey present.
|
||||
nodeNameSet map[string]struct{}
|
||||
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
|
||||
topologyPairToPodCounts map[topologyPair]*int64
|
||||
}
|
||||
|
||||
// topologySpreadConstraint is an internal version for a soft (ScheduleAnyway
|
||||
// unsatisfiable constraint action) v1.TopologySpreadConstraint and where the
|
||||
// selector is parsed.
|
||||
type topologySpreadConstraint struct {
|
||||
topologyKey string
|
||||
selector labels.Selector
|
||||
}
|
||||
|
||||
func newTopologySpreadConstraintsMap() *podTopologySpreadMap {
|
||||
return &podTopologySpreadMap{
|
||||
nodeNameSet: make(map[string]struct{}),
|
||||
@ -54,19 +64,22 @@ func newTopologySpreadConstraintsMap() *podTopologySpreadMap {
|
||||
|
||||
// buildPodTopologySpreadMap prepares necessary data (podTopologySpreadMap) for incoming pod on the filteredNodes.
|
||||
// Later Priority function will use 'podTopologySpreadMap' to perform the Scoring calculations.
|
||||
func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) *podTopologySpreadMap {
|
||||
// return if incoming pod doesn't have soft topology spread constraints.
|
||||
constraints := getSoftTopologySpreadConstraints(pod)
|
||||
if len(constraints) == 0 || len(filteredNodes) == 0 || len(allNodes) == 0 {
|
||||
return nil
|
||||
func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes []*schedulernodeinfo.NodeInfo) (*podTopologySpreadMap, error) {
|
||||
if len(filteredNodes) == 0 || len(allNodes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// initialize podTopologySpreadMap which will be used in Score plugin.
|
||||
m := newTopologySpreadConstraintsMap()
|
||||
m.initialize(pod, filteredNodes)
|
||||
err := m.initialize(pod, filteredNodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// return if incoming pod doesn't have soft topology spread constraints.
|
||||
if m.constraints == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
processAllNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
node := nodeInfo.Node()
|
||||
@ -76,12 +89,12 @@ func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes [
|
||||
// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
|
||||
// (2) All topologyKeys need to be present in `node`
|
||||
if !predicates.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
|
||||
!predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
|
||||
!nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
|
||||
return
|
||||
}
|
||||
|
||||
for _, constraint := range constraints {
|
||||
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
|
||||
for _, c := range m.constraints {
|
||||
pair := topologyPair{key: c.topologyKey, value: node.Labels[c.topologyKey]}
|
||||
// If current topology pair is not associated with any candidate node,
|
||||
// continue to avoid unnecessary calculation.
|
||||
if m.topologyPairToPodCounts[pair] == nil {
|
||||
@ -91,39 +104,37 @@ func buildPodTopologySpreadMap(pod *v1.Pod, filteredNodes []*v1.Node, allNodes [
|
||||
// <matchSum> indicates how many pods (on current node) match the <constraint>.
|
||||
matchSum := int64(0)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
|
||||
if err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
}
|
||||
if match {
|
||||
if c.selector.Matches(labels.Set(existingPod.Labels)) {
|
||||
matchSum++
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(m.topologyPairToPodCounts[pair], matchSum)
|
||||
}
|
||||
}
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
klog.Error(err)
|
||||
return nil
|
||||
}
|
||||
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processAllNode)
|
||||
|
||||
return m
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// initialize iterates "filteredNodes" to filter out the nodes which don't have required topologyKey(s),
|
||||
// and initialize two maps:
|
||||
// 1) m.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
|
||||
// 2) m.nodeNameSet: keyed with node name, and valued with a *int64 pointer for eligible node only.
|
||||
func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) {
|
||||
constraints := getSoftTopologySpreadConstraints(pod)
|
||||
func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node) error {
|
||||
constraints, err := filterSoftTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if constraints == nil {
|
||||
return nil
|
||||
}
|
||||
m.constraints = constraints
|
||||
for _, node := range filteredNodes {
|
||||
if !predicates.NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
|
||||
if !nodeLabelsMatchSpreadConstraints(node.Labels, m.constraints) {
|
||||
continue
|
||||
}
|
||||
for _, constraint := range constraints {
|
||||
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
|
||||
for _, constraint := range m.constraints {
|
||||
pair := topologyPair{key: constraint.topologyKey, value: node.Labels[constraint.topologyKey]}
|
||||
if m.topologyPairToPodCounts[pair] == nil {
|
||||
m.topologyPairToPodCounts[pair] = new(int64)
|
||||
}
|
||||
@ -132,6 +143,7 @@ func (m *podTopologySpreadMap) initialize(pod *v1.Pod, filteredNodes []*v1.Node)
|
||||
// For those nodes which don't have all required topologyKeys present, it's intentional to leave
|
||||
// their entries absent in nodeNameSet, so that we're able to score them to 0 afterwards.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CalculateEvenPodsSpreadPriorityMap calculate the number of matching pods on the passed-in "node",
|
||||
@ -155,13 +167,12 @@ func CalculateEvenPodsSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo
|
||||
return framework.NodeScore{Name: node.Name, Score: 0}, nil
|
||||
}
|
||||
|
||||
constraints := getSoftTopologySpreadConstraints(pod)
|
||||
// For each present <pair>, current node gets a credit of <matchSum>.
|
||||
// And we sum up <matchSum> and return it as this node's score.
|
||||
var score int64
|
||||
for _, constraint := range constraints {
|
||||
if tpVal, ok := node.Labels[constraint.TopologyKey]; ok {
|
||||
pair := topologyPair{key: constraint.TopologyKey, value: tpVal}
|
||||
for _, c := range m.constraints {
|
||||
if tpVal, ok := node.Labels[c.topologyKey]; ok {
|
||||
pair := topologyPair{key: c.topologyKey, value: tpVal}
|
||||
matchSum := *m.topologyPairToPodCounts[pair]
|
||||
score += matchSum
|
||||
}
|
||||
@ -228,14 +239,29 @@ func CalculateEvenPodsSpreadPriorityReduce(pod *v1.Pod, meta interface{}, shared
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(Huang-Wei): combine this with getHardTopologySpreadConstraints() in predicates package
|
||||
func getSoftTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
|
||||
if pod != nil {
|
||||
for _, constraint := range pod.Spec.TopologySpreadConstraints {
|
||||
if constraint.WhenUnsatisfiable == v1.ScheduleAnyway {
|
||||
constraints = append(constraints, constraint)
|
||||
func filterSoftTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint) ([]topologySpreadConstraint, error) {
|
||||
var r []topologySpreadConstraint
|
||||
for _, c := range constraints {
|
||||
if c.WhenUnsatisfiable == v1.ScheduleAnyway {
|
||||
selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r = append(r, topologySpreadConstraint{
|
||||
topologyKey: c.TopologyKey,
|
||||
selector: selector,
|
||||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// nodeLabelsMatchSpreadConstraints checks if ALL topology keys in spread constraints are present in node labels.
|
||||
func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
|
||||
for _, c := range constraints {
|
||||
if _, ok := nodeLabels[c.topologyKey]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -83,7 +83,9 @@ func Test_podTopologySpreadMap_initialize(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := newTopologySpreadConstraintsMap()
|
||||
m.initialize(tt.pod, tt.nodes)
|
||||
if err := m.initialize(tt.pod, tt.nodes); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(m.nodeNameSet, tt.wantNodeNameSet) {
|
||||
t.Errorf("initilize().nodeNameSet = %#v, want %#v", m.nodeNameSet, tt.wantNodeNameSet)
|
||||
}
|
||||
@ -436,8 +438,12 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
|
||||
allNodes = append(allNodes, tt.failedNodes...)
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(tt.existingPods, allNodes))
|
||||
|
||||
tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
meta := &priorityMetadata{
|
||||
podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, tt.nodes, snapshot.NodeInfoList),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
}
|
||||
var gotList framework.NodeScoreList
|
||||
for _, n := range tt.nodes {
|
||||
@ -449,7 +455,10 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
|
||||
gotList = append(gotList, nodeScore)
|
||||
}
|
||||
|
||||
CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
|
||||
err = CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(gotList, tt.want) {
|
||||
t.Errorf("CalculateEvenPodsSpreadPriorityReduce() = %#v, want %#v", gotList, tt.want)
|
||||
}
|
||||
@ -498,8 +507,12 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
|
||||
b.Run(tt.name, func(b *testing.B) {
|
||||
existingPods, allNodes, filteredNodes := st.MakeNodesAndPodsForEvenPodsSpread(tt.pod.Labels, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(existingPods, allNodes))
|
||||
tpSpreadMap, err := buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
meta := &priorityMetadata{
|
||||
podTopologySpreadMap: buildPodTopologySpreadMap(tt.pod, filteredNodes, snapshot.NodeInfoList),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
@ -510,7 +523,10 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
|
||||
nodeScore, _ := CalculateEvenPodsSpreadPriorityMap(tt.pod, meta, snapshot.NodeInfoMap[nodeName])
|
||||
gotList = append(gotList, nodeScore)
|
||||
}
|
||||
CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
|
||||
err := CalculateEvenPodsSpreadPriorityReduce(tt.pod, meta, snapshot, gotList)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
appslisters "k8s.io/client-go/listers/apps/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog"
|
||||
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
)
|
||||
@ -84,6 +85,11 @@ func (pmf *MetadataFactory) PriorityMetadata(
|
||||
allNodes = l
|
||||
}
|
||||
}
|
||||
tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, allNodes)
|
||||
if err != nil {
|
||||
klog.Errorf("Error building podTopologySpreadMap: %v", err)
|
||||
return nil
|
||||
}
|
||||
return &priorityMetadata{
|
||||
podLimits: getResourceLimits(pod),
|
||||
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
|
||||
@ -92,7 +98,7 @@ func (pmf *MetadataFactory) PriorityMetadata(
|
||||
controllerRef: metav1.GetControllerOf(pod),
|
||||
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
|
||||
totalNumNodes: totalNumNodes,
|
||||
podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, allNodes),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
topologyScore: buildTopologyPairToScore(pod, sharedLister, filteredNodes, pmf.hardPodAffinityWeight),
|
||||
}
|
||||
}
|
||||
|
@ -59,8 +59,12 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
tpSpreadMap, err := buildPodTopologySpreadMap(pod, filteredNodes, snapshot.NodeInfoList)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
meta := &priorityMetadata{
|
||||
podTopologySpreadMap: buildPodTopologySpreadMap(pod, filteredNodes, snapshot.NodeInfoList),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
}
|
||||
var gotList framework.NodeScoreList
|
||||
for _, n := range filteredNodes {
|
||||
@ -70,7 +74,7 @@ func BenchmarkTestDefaultEvenPodsSpreadPriority(b *testing.B) {
|
||||
}
|
||||
gotList = append(gotList, score)
|
||||
}
|
||||
err := CalculateEvenPodsSpreadPriorityReduce(pod, meta, snapshot, gotList)
|
||||
err = CalculateEvenPodsSpreadPriorityReduce(pod, meta, snapshot, gotList)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user