First pod with affinity can schedule only on nodes with matching topology keys

This commit is contained in:
Abdullah Gharaibeh 2020-05-16 10:37:32 -04:00
parent 9d3406c27b
commit 5d2c05408d
2 changed files with 105 additions and 78 deletions

View File

@ -160,6 +160,22 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
return true
}
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount {
topologyMap := make(topologyToMatchedTermCount)
for _, term := range existingPod.RequiredAntiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
topologyMap[pair]++
}
}
}
return topologyMap
}
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
@ -314,89 +330,61 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
// Checks if scheduling the pod onto this node would break any anti-affinity
// terms indicated by the existing pods.
func (pl *InterPodAffinity) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, state *preFilterState, nodeInfo *framework.NodeInfo) (bool, error) {
node := nodeInfo.Node()
topologyMap := state.topologyToMatchedExistingAntiAffinityTerms
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
for topologyKey, topologyValue := range node.Labels {
if topologyMap[topologyPair{key: topologyKey, value: topologyValue}] > 0 {
klog.V(10).Infof("Cannot schedule pod %+v onto node %v", pod.Name, node.Name)
return false, nil
}
}
return true, nil
}
// nodeMatchesAllAffinityTerms checks whether "nodeInfo" matches all affinity terms of the incoming pod.
func nodeMatchesAllAffinityTerms(nodeInfo *framework.NodeInfo, state *preFilterState) bool {
node := nodeInfo.Node()
for _, term := range state.podInfo.RequiredAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAffinityTerms[pair] <= 0 {
func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
if len(state.topologyToMatchedExistingAntiAffinityTerms) > 0 {
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
for topologyKey, topologyValue := range nodeInfo.Node().Labels {
tp := topologyPair{key: topologyKey, value: topologyValue}
if state.topologyToMatchedExistingAntiAffinityTerms[tp] > 0 {
return false
}
} else {
return false
}
}
return true
}
// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches any of the pod's anti affinity terms.
func nodeMatchesAnyAntiAffinityTerm(nodeInfo *framework.NodeInfo, state *preFilterState) bool {
node := nodeInfo.Node()
// Checks if the node satisifies the incoming pod's anti-affinity rules.
func satisfyPodAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
for _, term := range state.podInfo.RequiredAntiAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAntiAffinityTerms[pair] > 0 {
return true
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAntiAffinityTerms[tp] > 0 {
return false
}
}
}
return false
return true
}
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount {
topologyMap := make(topologyToMatchedTermCount)
for _, term := range existingPod.RequiredAntiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
topologyMap[pair]++
// Checks if the node satisfies the incoming pod's affinity rules.
func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
podsExist := true
for _, term := range state.podInfo.RequiredAffinityTerms {
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.topologyToMatchedAffinityTerms[tp] <= 0 {
podsExist = false
}
} else {
// All topology labels must exist on the node.
return false
}
}
return topologyMap
}
// satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod.
// This function returns two boolean flags. The first boolean flag indicates whether the pod matches affinity rules
// or not. The second boolean flag indicates if the pod matches anti-affinity rules.
func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) (bool, bool, error) {
// Check all affinity terms.
if !nodeMatchesAllAffinityTerms(nodeInfo, state) {
if !podsExist {
// This pod may be the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
// in the cluster matches the namespace and selector of this pod, the pod matches
// its own terms, and the node has all the requested topologies, then we allow the pod
// to pass the affinity check.
podInfo := state.podInfo
if len(state.topologyToMatchedAffinityTerms) != 0 || !podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) {
return false, false, nil
if len(state.topologyToMatchedAffinityTerms) == 0 && podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) {
return true
}
return false
}
// Check all anti-affinity terms.
if nodeMatchesAnyAntiAffinityTerm(nodeInfo, state) {
return true, false, nil
}
return true, true, nil
return true
}
// Filter invoked at the filter extension point.
@ -411,25 +399,17 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
return framework.NewStatus(framework.Error, err.Error())
}
if s, err := pl.satisfiesExistingPodsAntiAffinity(pod, state, nodeInfo); !s || err != nil {
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
return framework.NewStatus(framework.Unschedulable, ErrReasonAffinityNotMatch, ErrReasonExistingAntiAffinityRulesNotMatch)
if !satisfyPodAffinity(state, nodeInfo) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, ErrReasonAffinityRulesNotMatch)
}
// Now check if <pod> requirements will be satisfied on this node.
if satisfiesAffinity, satisfiesAntiAffinity, err := pl.satisfiesPodsAffinityAntiAffinity(state, nodeInfo); err != nil || !satisfiesAffinity || !satisfiesAntiAffinity {
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if !satisfiesAffinity {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonAffinityNotMatch, ErrReasonAffinityRulesNotMatch)
}
if !satisfyPodAntiAffinity(state, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonAffinityNotMatch, ErrReasonAntiAffinityRulesNotMatch)
}
if !satisfyExistingPodsAntiAffinity(state, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonAffinityNotMatch, ErrReasonExistingAntiAffinityRulesNotMatch)
}
return nil
}

View File

@ -707,7 +707,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonExistingAntiAffinityRulesNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check b1: incoming pod and existing pod partially match each other on AffinityTerms",
},
@ -768,7 +768,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
wantStatus: framework.NewStatus(
framework.Unschedulable,
ErrReasonAffinityNotMatch,
ErrReasonExistingAntiAffinityRulesNotMatch,
ErrReasonAntiAffinityRulesNotMatch,
),
name: "PodAntiAffinity symmetry check b2: incoming pod and existing pod partially match each other on AffinityTerms",
},
@ -888,6 +888,53 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
name: "The affinity rule is to schedule all of the pods of this collection to the same zone. The first pod of the collection " +
"should not be blocked from being scheduled onto any node, even there's no existing pod that matches the rule anywhere.",
},
{
pod: createPodWithAffinityTerms(defaultNamespace, "", map[string]string{"foo": "bar", "service": "securityscan"},
[]v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar"},
},
},
},
TopologyKey: "zone",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan"},
},
},
},
TopologyKey: "zone",
},
}, nil),
pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "nodeA"}, ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: map[string]string{"foo": "bar"}}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: map[string]string{"zoneLabel": "az1", "hostname": "h1"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: map[string]string{"zoneLabel": "az2", "hostname": "h2"}}},
},
wantStatuses: []*framework.Status{
framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
ErrReasonAffinityRulesNotMatch,
),
framework.NewStatus(
framework.UnschedulableAndUnresolvable,
ErrReasonAffinityNotMatch,
ErrReasonAffinityRulesNotMatch,
),
},
name: "The first pod of the collection can only be scheduled on nodes labelled with the requested topology keys",
},
{
pod: createPodWithAffinityTerms(defaultNamespace, "", nil, nil,
[]v1.PodAffinityTerm{