From 7331ec7b0212458622c98d2ef47bff369109f64d Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Fri, 13 Dec 2019 12:46:29 -0500 Subject: [PATCH] Move service affinity predicate logic to its plugin. --- .../algorithm/predicates/metadata.go | 83 +----- .../algorithm/predicates/metadata_test.go | 192 -------------- .../algorithm/predicates/predicates.go | 110 -------- .../algorithm/predicates/predicates_test.go | 166 ------------ pkg/scheduler/algorithm_factory.go | 11 +- .../apis/config/testing/compatibility_test.go | 19 ++ .../framework/plugins/default_registry.go | 1 + .../framework/plugins/serviceaffinity/BUILD | 5 +- .../serviceaffinity/service_affinity.go | 240 ++++++++++++++++-- .../serviceaffinity/service_affinity_test.go | 175 ++++++++++++- 10 files changed, 410 insertions(+), 592 deletions(-) diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index deb7f1b97b4..fdcc2359885 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -18,17 +18,15 @@ package predicates import ( "context" - "fmt" "math" "sync" - "k8s.io/klog" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -129,56 +127,6 @@ type topologySpreadConstraint struct { selector labels.Selector } -type serviceAffinityMetadata struct { - matchingPodList []*v1.Pod - matchingPodServices []*v1.Service -} - -func (m *serviceAffinityMetadata) addPod(addedPod *v1.Pod, pod *v1.Pod, node *v1.Node) { - // If addedPod is in the same namespace as the pod, update the list - // of matching pods if applicable. - if m == nil || addedPod.Namespace != pod.Namespace { - return - } - - selector := CreateSelectorFromLabels(pod.Labels) - if selector.Matches(labels.Set(addedPod.Labels)) { - m.matchingPodList = append(m.matchingPodList, addedPod) - } -} - -func (m *serviceAffinityMetadata) removePod(deletedPod *v1.Pod, node *v1.Node) { - deletedPodFullName := schedutil.GetPodFullName(deletedPod) - - if m == nil || - len(m.matchingPodList) == 0 || - deletedPod.Namespace != m.matchingPodList[0].Namespace { - return - } - - for i, pod := range m.matchingPodList { - if schedutil.GetPodFullName(pod) == deletedPodFullName { - m.matchingPodList = append(m.matchingPodList[:i], m.matchingPodList[i+1:]...) - break - } - } -} - -func (m *serviceAffinityMetadata) clone() *serviceAffinityMetadata { - if m == nil { - return nil - } - - copy := serviceAffinityMetadata{} - - copy.matchingPodServices = append([]*v1.Service(nil), - m.matchingPodServices...) - copy.matchingPodList = append([]*v1.Pod(nil), - m.matchingPodList...) - - return © -} - // PodAffinityMetadata pre-computed state for inter-pod affinity predicate. type PodAffinityMetadata struct { // A map of topology pairs to the number of existing pods that has anti-affinity terms that match the "pod". @@ -283,10 +231,8 @@ func (m *PodAffinityMetadata) Clone() *PodAffinityMetadata { // NOTE: When new fields are added/removed or logic is changed, please make sure that // RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes. +// TODO(ahg-g): remove, not use anymore. type predicateMetadata struct { - pod *v1.Pod - - serviceAffinityMetadata *serviceAffinityMetadata } // Ensure that predicateMetadata implements algorithm.Metadata. @@ -318,9 +264,7 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister return nil } - predicateMetadata := &predicateMetadata{ - pod: pod, - } + predicateMetadata := &predicateMetadata{} for predicateName, precomputeFunc := range predicateMetadataProducers { klog.V(10).Infof("Precompute: %v", predicateName) precomputeFunc(predicateMetadata) @@ -519,38 +463,19 @@ func (m *PodTopologySpreadMetadata) Clone() *PodTopologySpreadMetadata { // RemovePod changes predicateMetadata assuming that the given `deletedPod` is // deleted from the system. func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) error { - deletedPodFullName := schedutil.GetPodFullName(deletedPod) - if deletedPodFullName == schedutil.GetPodFullName(meta.pod) { - return fmt.Errorf("deletedPod and meta.pod must not be the same") - } - meta.serviceAffinityMetadata.removePod(deletedPod, node) - return nil } // AddPod changes predicateMetadata assuming that the given `addedPod` is added to the // system. func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error { - addedPodFullName := schedutil.GetPodFullName(addedPod) - if addedPodFullName == schedutil.GetPodFullName(meta.pod) { - return fmt.Errorf("addedPod and meta.pod must not be the same") - } - if node == nil { - return fmt.Errorf("node not found") - } - - meta.serviceAffinityMetadata.addPod(addedPod, meta.pod, node) - return nil } // ShallowCopy copies a metadata struct into a new struct and creates a copy of // its maps and slices, but it does not copy the contents of pointer values. func (meta *predicateMetadata) ShallowCopy() Metadata { - newPredMeta := &predicateMetadata{ - pod: meta.pod, - } - newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone() + newPredMeta := &predicateMetadata{} return (Metadata)(newPredMeta) } diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index adf73050740..5e026ef8224 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -17,181 +17,16 @@ limitations under the License. package predicates import ( - "fmt" "reflect" - "sort" "testing" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) -// sortablePods lets us to sort pods. -type sortablePods []*v1.Pod - -func (s sortablePods) Less(i, j int) bool { - return s[i].Namespace < s[j].Namespace || - (s[i].Namespace == s[j].Namespace && s[i].Name < s[j].Name) -} -func (s sortablePods) Len() int { return len(s) } -func (s sortablePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -var _ sort.Interface = &sortablePods{} - -// sortableServices allows us to sort services. -type sortableServices []*v1.Service - -func (s sortableServices) Less(i, j int) bool { - return s[i].Namespace < s[j].Namespace || - (s[i].Namespace == s[j].Namespace && s[i].Name < s[j].Name) -} -func (s sortableServices) Len() int { return len(s) } -func (s sortableServices) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -var _ sort.Interface = &sortableServices{} - -// predicateMetadataEquivalent returns true if the two metadata are equivalent. -// Note: this function does not compare podRequest. -func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { - if !reflect.DeepEqual(meta1.pod, meta2.pod) { - return fmt.Errorf("pods are not the same") - } - if meta1.serviceAffinityMetadata != nil { - sortablePods1 := sortablePods(meta1.serviceAffinityMetadata.matchingPodList) - sort.Sort(sortablePods1) - sortablePods2 := sortablePods(meta2.serviceAffinityMetadata.matchingPodList) - sort.Sort(sortablePods2) - if !reflect.DeepEqual(sortablePods1, sortablePods2) { - return fmt.Errorf("serviceAffinityMatchingPodLists are not euqal") - } - - sortableServices1 := sortableServices(meta1.serviceAffinityMetadata.matchingPodServices) - sort.Sort(sortableServices1) - sortableServices2 := sortableServices(meta2.serviceAffinityMetadata.matchingPodServices) - sort.Sort(sortableServices2) - if !reflect.DeepEqual(sortableServices1, sortableServices2) { - return fmt.Errorf("serviceAffinityMatchingPodServices are not euqal") - } - } - return nil -} - -func TestPredicateMetadata_AddRemovePod(t *testing.T) { - var label1 = map[string]string{ - "region": "r1", - "zone": "z11", - } - var label2 = map[string]string{ - "region": "r1", - "zone": "z12", - } - var label3 = map[string]string{ - "region": "r2", - "zone": "z21", - } - selector1 := map[string]string{"foo": "bar"} - - tests := []struct { - name string - pendingPod *v1.Pod - addedPod *v1.Pod - existingPods []*v1.Pod - nodes []*v1.Node - services []*v1.Service - }{ - { - name: "no anti-affinity or service affinity exist", - pendingPod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1}, - }, - existingPods: []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, - Spec: v1.PodSpec{NodeName: "nodeA"}, - }, - {ObjectMeta: metav1.ObjectMeta{Name: "p2"}, - Spec: v1.PodSpec{NodeName: "nodeC"}, - }, - }, - addedPod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1}, - Spec: v1.PodSpec{NodeName: "nodeB"}, - }, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, - {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, - }, - }, - { - name: "metadata service-affinity data are updated correctly after adding and removing a pod", - pendingPod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1}, - }, - existingPods: []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, - Spec: v1.PodSpec{NodeName: "nodeA"}, - }, - {ObjectMeta: metav1.ObjectMeta{Name: "p2"}, - Spec: v1.PodSpec{NodeName: "nodeC"}, - }, - }, - addedPod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1}, - Spec: v1.PodSpec{NodeName: "nodeB"}, - }, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}}, - nodes: []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, - {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, - {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - allPodLister := fakelisters.PodLister(append(test.existingPods, test.addedPod)) - // getMeta creates predicate meta data given the list of pods. - getMeta := func(pods []*v1.Pod) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) { - s := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(pods, test.nodes)) - _, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), nil) - RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) - factory := &MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pendingPod, s) - return meta.(*predicateMetadata), s.NodeInfoMap - } - - // allPodsMeta is meta data produced when all pods, including test.addedPod - // are given to the metadata producer. - allPodsMeta, _ := getMeta(allPodLister) - // existingPodsMeta1 is meta data produced for test.existingPods (without test.addedPod). - existingPodsMeta1, nodeInfoMap := getMeta(test.existingPods) - // Add test.addedPod to existingPodsMeta1 and make sure meta is equal to allPodsMeta - nodeInfo := nodeInfoMap[test.addedPod.Spec.NodeName] - if err := existingPodsMeta1.AddPod(test.addedPod, nodeInfo.Node()); err != nil { - t.Errorf("error adding pod to meta: %v", err) - } - if err := predicateMetadataEquivalent(allPodsMeta, existingPodsMeta1); err != nil { - t.Errorf("meta data are not equivalent: %v", err) - } - // Remove the added pod and from existingPodsMeta1 an make sure it is equal - // to meta generated for existing pods. - existingPodsMeta2, _ := getMeta(fakelisters.PodLister(test.existingPods)) - if err := existingPodsMeta1.RemovePod(test.addedPod, nodeInfo.Node()); err != nil { - t.Errorf("error removing pod from meta: %v", err) - } - if err := predicateMetadataEquivalent(existingPodsMeta1, existingPodsMeta2); err != nil { - t.Errorf("meta data are not equivalent: %v", err) - } - }) - } -} - func TestPodAffinityMetadata_Clone(t *testing.T) { source := &PodAffinityMetadata{ topologyToMatchedExistingAntiAffinityTerms: topologyToMatchedTermCount{ @@ -217,33 +52,6 @@ func TestPodAffinityMetadata_Clone(t *testing.T) { } } -// TestPredicateMetadata_ShallowCopy tests the ShallowCopy function. It is based -// on the idea that shallow-copy should produce an object that is deep-equal to the original -// object. -func TestPredicateMetadata_ShallowCopy(t *testing.T) { - source := predicateMetadata{ - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "testns", - }, - }, - serviceAffinityMetadata: &serviceAffinityMetadata{ - matchingPodList: []*v1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}, - }, - matchingPodServices: []*v1.Service{ - {ObjectMeta: metav1.ObjectMeta{Name: "service1"}}, - }, - }, - } - - if !reflect.DeepEqual(source.ShallowCopy().(*predicateMetadata), &source) { - t.Errorf("Copy is not equal to source!") - } -} - // TestGetTPMapMatchingIncomingAffinityAntiAffinity tests against method getTPMapMatchingIncomingAffinityAntiAffinity // on Anti Affinity cases func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index ecd14da960e..cc0393a0af2 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -985,116 +985,6 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta Metadata, no return false, []PredicateFailureReason{ErrNodeLabelPresenceViolated}, nil } -// ServiceAffinity defines a struct used for creating service affinity predicates. -type ServiceAffinity struct { - nodeInfoLister schedulerlisters.NodeInfoLister - podLister schedulerlisters.PodLister - serviceLister corelisters.ServiceLister - labels []string -} - -// serviceAffinityMetadataProducer should be run once by the scheduler before looping through the Predicate. It is a helper function that -// only should be referenced by NewServiceAffinityPredicate. -func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata) { - if pm.pod == nil { - klog.Errorf("Cannot precompute service affinity, a pod is required to calculate service affinity.") - return - } - // Store services which match the pod. - matchingPodServices, err := schedulerlisters.GetPodServices(s.serviceLister, pm.pod) - if err != nil { - klog.Errorf("Error precomputing service affinity: could not list services: %v", err) - } - selector := CreateSelectorFromLabels(pm.pod.Labels) - allMatches, err := s.podLister.List(selector) - if err != nil { - klog.Errorf("Error precomputing service affinity: could not list pods: %v", err) - } - - // consider only the pods that belong to the same namespace - matchingPodList := FilterPodsByNamespace(allMatches, pm.pod.Namespace) - pm.serviceAffinityMetadata = &serviceAffinityMetadata{ - matchingPodList: matchingPodList, - matchingPodServices: matchingPodServices, - } -} - -// NewServiceAffinityPredicate creates a ServiceAffinity. -func NewServiceAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (FitPredicate, predicateMetadataProducer) { - affinity := &ServiceAffinity{ - nodeInfoLister: nodeInfoLister, - podLister: podLister, - serviceLister: serviceLister, - labels: labels, - } - return affinity.checkServiceAffinity, affinity.serviceAffinityMetadataProducer -} - -// checkServiceAffinity is a predicate which matches nodes in such a way to force that -// ServiceAffinity.labels are homogeneous for pods that are scheduled to a node. -// (i.e. it returns true IFF this pod can be added to this node such that all other pods in -// the same service are running on nodes with the exact same ServiceAffinity.label values). -// -// For example: -// If the first pod of a service was scheduled to a node with label "region=foo", -// all the other subsequent pods belong to the same service will be schedule on -// nodes with the same "region=foo" label. -// -// Details: -// -// If (the svc affinity labels are not a subset of pod's label selectors ) -// The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate -// the match. -// Otherwise: -// Create an "implicit selector" which guarantees pods will land on nodes with similar values -// for the affinity labels. -// -// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace. -// These backfilled labels in the selector "L" are defined like so: -// - L is a label that the ServiceAffinity object needs as a matching constraint. -// - L is not defined in the pod itself already. -// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value. -// -// WARNING: This Predicate is NOT guaranteed to work if some of the predicateMetadata data isn't precomputed... -// For that reason it is not exported, i.e. it is highly coupled to the implementation of the FitPredicate construction. -func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { - var services []*v1.Service - var pods []*v1.Pod - if pm, ok := meta.(*predicateMetadata); ok && pm.serviceAffinityMetadata != nil && (pm.serviceAffinityMetadata.matchingPodList != nil || pm.serviceAffinityMetadata.matchingPodServices != nil) { - services = pm.serviceAffinityMetadata.matchingPodServices - pods = pm.serviceAffinityMetadata.matchingPodList - } else { - // Make the predicate resilient in case metadata is missing. - pm = &predicateMetadata{pod: pod} - s.serviceAffinityMetadataProducer(pm) - pods, services = pm.serviceAffinityMetadata.matchingPodList, pm.serviceAffinityMetadata.matchingPodServices - } - filteredPods := nodeInfo.FilterOutPods(pods) - node := nodeInfo.Node() - if node == nil { - return false, nil, fmt.Errorf("node not found") - } - // check if the pod being scheduled has the affinity labels specified in its NodeSelector - affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector)) - // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints. - if len(s.labels) > len(affinityLabels) { - if len(services) > 0 { - if len(filteredPods) > 0 { - nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName) - if err != nil { - return false, nil, err - } - AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels)) - } - } - } - // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find. - if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { - return true, nil, nil - } - return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil -} - // PodFitsHostPorts is a wrapper around PodFitsHostPortsPredicate. This is needed until // we are able to get rid of the FitPredicate function signature. // TODO(#85822): remove this function once predicate registration logic is deleted. diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index abcf5294560..74580dd1c45 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -17,7 +17,6 @@ limitations under the License. package predicates import ( - "fmt" "os" "reflect" "strconv" @@ -36,7 +35,6 @@ import ( "k8s.io/kubernetes/pkg/features" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) var ( @@ -1724,170 +1722,6 @@ func TestNodeLabelPresence(t *testing.T) { } } -func TestServiceAffinity(t *testing.T) { - selector := map[string]string{"foo": "bar"} - labels1 := map[string]string{ - "region": "r1", - "zone": "z11", - } - labels2 := map[string]string{ - "region": "r1", - "zone": "z12", - } - labels3 := map[string]string{ - "region": "r2", - "zone": "z21", - } - labels4 := map[string]string{ - "region": "r2", - "zone": "z22", - } - node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labels1}} - node2 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine2", Labels: labels2}} - node3 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine3", Labels: labels3}} - node4 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine4", Labels: labels4}} - node5 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine5", Labels: labels4}} - tests := []struct { - pod *v1.Pod - pods []*v1.Pod - services []*v1.Service - node *v1.Node - labels []string - fits bool - name string - }{ - { - pod: new(v1.Pod), - node: &node1, - fits: true, - labels: []string{"region"}, - name: "nothing scheduled", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r1"}}}, - node: &node1, - fits: true, - labels: []string{"region"}, - name: "pod with region label match", - }, - { - pod: &v1.Pod{Spec: v1.PodSpec{NodeSelector: map[string]string{"region": "r2"}}}, - node: &node1, - fits: false, - labels: []string{"region"}, - name: "pod with region label mismatch", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, - fits: true, - labels: []string{"region"}, - name: "service pod on same node", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, - fits: true, - labels: []string{"region"}, - name: "service pod on different node, region match", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, - fits: false, - labels: []string{"region"}, - name: "service pod on different node, region mismatch", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns2"}}}, - fits: true, - labels: []string{"region"}, - name: "service in different namespace, region mismatch", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns2"}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}}, - fits: true, - labels: []string{"region"}, - name: "pod in different namespace, region mismatch", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine3"}, ObjectMeta: metav1.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}, ObjectMeta: metav1.ObjectMeta{Namespace: "ns1"}}}, - fits: false, - labels: []string{"region"}, - name: "service and pod in same namespace, region mismatch", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine2"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, - node: &node1, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, - fits: false, - labels: []string{"region", "zone"}, - name: "service pod on different node, multiple labels, not all match", - }, - { - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: selector}}, - pods: []*v1.Pod{{Spec: v1.PodSpec{NodeName: "machine5"}, ObjectMeta: metav1.ObjectMeta{Labels: selector}}}, - node: &node4, - services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector}}}, - fits: true, - labels: []string{"region", "zone"}, - name: "service pod on different node, multiple labels, all match", - }, - } - expectedFailureReasons := []PredicateFailureReason{ErrServiceAffinityViolated} - for _, test := range tests { - testIt := func(skipPrecompute bool) { - t.Run(fmt.Sprintf("%v/skipPrecompute/%v", test.name, skipPrecompute), func(t *testing.T) { - nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5} - s := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) - - // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. - predicate, precompute := NewServiceAffinityPredicate(s.NodeInfos(), s.Pods(), fakelisters.ServiceLister(test.services), test.labels) - // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. - RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) { - if !skipPrecompute { - precompute(pm) - } - }) - factory := &MetadataProducerFactory{} - if pmeta, ok := (factory.GetPredicateMetadata(test.pod, s)).(*predicateMetadata); ok { - fits, reasons, err := predicate(test.pod, pmeta, s.NodeInfoMap[test.node.Name]) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) { - t.Errorf("unexpected failure reasons: %v, want: %v", reasons, expectedFailureReasons) - } - if fits != test.fits { - t.Errorf("expected: %v got %v", test.fits, fits) - } - } else { - t.Errorf("Error casting.") - } - }) - } - - testIt(false) // Confirm that the predicate works without precomputed data (resilience) - testIt(true) // Confirm that the predicate works with the precomputed data (better performance) - } -} - func newPodWithPort(hostPorts ...int) *v1.Pod { networkPorts := []v1.ContainerPort{} for _, port := range hostPorts { diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index c101897436a..abd5f837c2b 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -264,16 +264,7 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy, pluginArgs pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...) predicateFactory = func(args AlgorithmFactoryArgs) predicates.FitPredicate { - predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( - args.SharedLister.NodeInfos(), - args.SharedLister.Pods(), - args.InformerFactory.Core().V1().Services().Lister(), - pluginArgs.ServiceAffinityArgs.AffinityLabels, - ) - - // Once we generate the predicate we should also Register the Precomputation - predicates.RegisterPredicateMetadataProducer(policyName, precomputationFunction) - return predicate + return nil } } else if policy.Argument.LabelsPresence != nil { // We use the CheckNodeLabelPresencePred predicate name for all kNodeLabel custom predicates. diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 369a0f65621..a2100122042 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -117,6 +117,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "PodFitsPorts", ), wantPlugins: map[string][]config.Plugin{ + "PreFilterPlugin": { + {Name: "ServiceAffinity"}, + }, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "NodeAffinity"}, @@ -160,6 +163,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ + "PreFilterPlugin": { + {Name: "ServiceAffinity"}, + }, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "NodeName"}, @@ -212,6 +218,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }`, wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ + "PreFilterPlugin": { + {Name: "ServiceAffinity"}, + }, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "NodeName"}, @@ -274,6 +283,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -341,6 +351,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -419,6 +430,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -508,6 +520,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -598,6 +611,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -692,6 +706,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -798,6 +813,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -906,6 +922,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1014,6 +1031,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { @@ -1127,6 +1145,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { wantPredicates: sets.NewString(), wantPlugins: map[string][]config.Plugin{ "PreFilterPlugin": { + {Name: "ServiceAffinity"}, {Name: "InterPodAffinity"}, }, "FilterPlugin": { diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 05eafbd8e1f..28dec853577 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -223,6 +223,7 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil) pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs)) + plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil) return }) diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/BUILD b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD index ab27433dffb..4c207077ef9 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/serviceaffinity/BUILD @@ -10,9 +10,13 @@ go_library( "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) @@ -21,7 +25,6 @@ go_test( srcs = ["service_affinity_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go index 496451c6ca8..c6edfabfb33 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go @@ -20,17 +20,27 @@ import ( "context" "fmt" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -// Name is the name of the plugin used in the plugin registry and configurations. -const Name = "ServiceAffinity" +const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = "ServiceAffinity" + + // preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data. + // Using the name of the plugin will likely help us avoid collisions with other plugins. + preFilterStateKey = "PreFilter" + Name +) // Args holds the args that are used to configure the plugin. type Args struct { @@ -42,39 +52,58 @@ type Args struct { AntiAffinityLabelsPreference []string `json:"antiAffinityLabelsPreference,omitempty"` } +// preFilterState computed at PreFilter and used at Filter. +type preFilterState struct { + matchingPodList []*v1.Pod + matchingPodServices []*v1.Service +} + +// Clone the prefilter state. +func (s *preFilterState) Clone() framework.StateData { + if s == nil { + return nil + } + + copy := preFilterState{} + copy.matchingPodServices = append([]*v1.Service(nil), + s.matchingPodServices...) + copy.matchingPodList = append([]*v1.Pod(nil), + s.matchingPodList...) + + return © +} + // New initializes a new plugin and returns it. func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) { - args := &Args{} - if err := framework.DecodeInto(plArgs, args); err != nil { + args := Args{} + if err := framework.DecodeInto(plArgs, &args); err != nil { return nil, err } informerFactory := handle.SharedInformerFactory() - nodeInfoLister := handle.SnapshotSharedLister().NodeInfos() podLister := handle.SnapshotSharedLister().Pods() serviceLister := informerFactory.Core().V1().Services().Lister() - fitPredicate, predicateMetadataProducer := predicates.NewServiceAffinityPredicate(nodeInfoLister, podLister, serviceLister, args.AffinityLabels) - // Once we generate the predicate we should also Register the Precomputation - predicates.RegisterPredicateMetadataProducer(predicates.CheckServiceAffinityPred, predicateMetadataProducer) - priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(podLister, serviceLister, args.AntiAffinityLabelsPreference) return &ServiceAffinity{ - handle: handle, - predicate: fitPredicate, + sharedLister: handle.SnapshotSharedLister(), + serviceLister: serviceLister, priorityMapFunction: priorityMapFunction, priorityReduceFunction: priorityReduceFunction, + args: args, }, nil } // ServiceAffinity is a plugin that checks service affinity. type ServiceAffinity struct { - handle framework.FrameworkHandle - predicate predicates.FitPredicate + args Args + sharedLister schedulerlisters.SharedLister + serviceLister corelisters.ServiceLister priorityMapFunction priorities.PriorityMapFunction priorityReduceFunction priorities.PriorityReduceFunction } +var _ framework.PreFilterPlugin = &ServiceAffinity{} var _ framework.FilterPlugin = &ServiceAffinity{} var _ framework.ScorePlugin = &ServiceAffinity{} @@ -83,19 +112,184 @@ func (pl *ServiceAffinity) Name() string { return Name } -// Filter invoked at the filter extension point. -func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { - meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState)) - if !ok { - return framework.NewStatus(framework.Error, "looking up Metadata") +func (pl *ServiceAffinity) createPreFilterState(pod *v1.Pod) (*preFilterState, error) { + if pod == nil { + return nil, fmt.Errorf("a pod is required to calculate service affinity preFilterState") } - _, reasons, err := pl.predicate(pod, meta, nodeInfo) - return migration.PredicateResultToFrameworkStatus(reasons, err) + // Store services which match the pod. + matchingPodServices, err := schedulerlisters.GetPodServices(pl.serviceLister, pod) + if err != nil { + return nil, fmt.Errorf("listing pod services: %v", err.Error()) + } + selector := predicates.CreateSelectorFromLabels(pod.Labels) + allMatches, err := pl.sharedLister.Pods().List(selector) + if err != nil { + return nil, fmt.Errorf("listing pods: %v", err.Error()) + } + + // consider only the pods that belong to the same namespace + matchingPodList := predicates.FilterPodsByNamespace(allMatches, pod.Namespace) + + return &preFilterState{ + matchingPodList: matchingPodList, + matchingPodServices: matchingPodServices, + }, nil +} + +// PreFilter invoked at the prefilter extension point. +func (pl *ServiceAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { + s, err := pl.createPreFilterState(pod) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err)) + + } + cycleState.Write(preFilterStateKey, s) + return nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *ServiceAffinity) PreFilterExtensions() framework.PreFilterExtensions { + return pl +} + +// AddPod from pre-computed data in cycleState. +func (pl *ServiceAffinity) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + + // If addedPod is in the same namespace as the pod, update the list + // of matching pods if applicable. + if s == nil || podToAdd.Namespace != podToSchedule.Namespace { + return nil + } + + selector := predicates.CreateSelectorFromLabels(podToSchedule.Labels) + if selector.Matches(labels.Set(podToAdd.Labels)) { + s.matchingPodList = append(s.matchingPodList, podToAdd) + } + + return nil +} + +// RemovePod from pre-computed data in cycleState. +func (pl *ServiceAffinity) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + + if s == nil || + len(s.matchingPodList) == 0 || + podToRemove.Namespace != s.matchingPodList[0].Namespace { + return nil + } + + for i, pod := range s.matchingPodList { + if pod.Name == podToRemove.Name && pod.Namespace == podToRemove.Namespace { + s.matchingPodList = append(s.matchingPodList[:i], s.matchingPodList[i+1:]...) + break + } + } + + return nil +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // The metadata wasn't pre-computed in prefilter. We ignore the error for now since + // Filter is able to handle that by computing it again. + klog.Error(fmt.Sprintf("reading %q from cycleState: %v", preFilterStateKey, err)) + return nil, nil + } + + if c == nil { + return nil, nil + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to interpodaffinity.state error", c) + } + return s, nil +} + +// Filter matches nodes in such a way to force that +// ServiceAffinity.labels are homogeneous for pods that are scheduled to a node. +// (i.e. it returns true IFF this pod can be added to this node such that all other pods in +// the same service are running on nodes with the exact same ServiceAffinity.label values). +// +// For example: +// If the first pod of a service was scheduled to a node with label "region=foo", +// all the other subsequent pods belong to the same service will be schedule on +// nodes with the same "region=foo" label. +// +// Details: +// +// If (the svc affinity labels are not a subset of pod's label selectors ) +// The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate +// the match. +// Otherwise: +// Create an "implicit selector" which guarantees pods will land on nodes with similar values +// for the affinity labels. +// +// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace. +// These backfilled labels in the selector "L" are defined like so: +// - L is a label that the ServiceAffinity object needs as a matching constraint. +// - L is not defined in the pod itself already. +// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value. +func (pl *ServiceAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { + if len(pl.args.AffinityLabels) == 0 { + return nil + } + + node := nodeInfo.Node() + if node == nil { + return framework.NewStatus(framework.Error, "node not found") + } + + s, err := getPreFilterState(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + if s == nil { + // Make the filter resilient in case preFilterState is missing. + s, err = pl.createPreFilterState(pod) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("could not create preFilterState: %v", err)) + + } + } + + pods, services := s.matchingPodList, s.matchingPodServices + filteredPods := nodeInfo.FilterOutPods(pods) + // check if the pod being scheduled has the affinity labels specified in its NodeSelector + affinityLabels := predicates.FindLabelsInSet(pl.args.AffinityLabels, labels.Set(pod.Spec.NodeSelector)) + // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints. + if len(pl.args.AffinityLabels) > len(affinityLabels) { + if len(services) > 0 { + if len(filteredPods) > 0 { + nodeWithAffinityLabels, err := pl.sharedLister.NodeInfos().Get(filteredPods[0].Spec.NodeName) + if err != nil { + return framework.NewStatus(framework.Error, "node not found") + } + predicates.AddUnsetLabelsToMap(affinityLabels, pl.args.AffinityLabels, labels.Set(nodeWithAffinityLabels.Node().Labels)) + } + } + } + // Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find. + if predicates.CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { + return nil + } + + return migration.PredicateResultToFrameworkStatus([]predicates.PredicateFailureReason{predicates.ErrServiceAffinityViolated}, nil) } // Score invoked at the Score extension point. func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } @@ -107,7 +301,7 @@ func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleStat // NormalizeScore invoked after scoring all nodes. func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { // Note that priorityReduceFunction doesn't use priority metadata, hence passing nil here. - err := pl.priorityReduceFunction(pod, nil, pl.handle.SnapshotSharedLister(), scores) + err := pl.priorityReduceFunction(pod, nil, pl.sharedLister, scores) return migration.ErrorToFrameworkStatus(err) } diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go index f7fa458f36c..51f51c4453f 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity_test.go @@ -25,7 +25,6 @@ import ( apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -165,18 +164,18 @@ func TestServiceAffinity(t *testing.T) { nodes := []*v1.Node{&node1, &node2, &node3, &node4, &node5} snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) - predicate, precompute := predicates.NewServiceAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods(), fakelisters.ServiceLister(test.services), test.labels) - predicates.RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) - p := &ServiceAffinity{ - predicate: predicate, + sharedLister: snapshot, + serviceLister: fakelisters.ServiceLister(test.services), + args: Args{ + AffinityLabels: test.labels, + }, } - factory := &predicates.MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pod, snapshot) state := framework.NewCycleState() - state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - + if s := p.PreFilter(context.Background(), state, test.pod); !s.IsSuccess() { + t.Errorf("PreFilter failed: %v", s.Message()) + } status := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name]) if status.Code() != test.res { t.Errorf("Status mismatch. got: %v, want: %v", status.Code(), test.res) @@ -391,12 +390,12 @@ func TestServiceAffinityScore(t *testing.T) { t.Run(test.name, func(t *testing.T) { nodes := makeLabeledNodeList(test.nodes) snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) - fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot)) serviceLister := fakelisters.ServiceLister(test.services) priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(snapshot.Pods(), serviceLister, test.labels) p := &ServiceAffinity{ - handle: fh, + sharedLister: snapshot, + serviceLister: serviceLister, priorityMapFunction: priorityMapFunction, priorityReduceFunction: priorityReduceFunction, } @@ -434,6 +433,160 @@ func TestServiceAffinityScore(t *testing.T) { } } +func TestPreFilterStateAddRemovePod(t *testing.T) { + var label1 = map[string]string{ + "region": "r1", + "zone": "z11", + } + var label2 = map[string]string{ + "region": "r1", + "zone": "z12", + } + var label3 = map[string]string{ + "region": "r2", + "zone": "z21", + } + selector1 := map[string]string{"foo": "bar"} + + tests := []struct { + name string + pendingPod *v1.Pod + addedPod *v1.Pod + existingPods []*v1.Pod + nodes []*v1.Node + services []*v1.Service + }{ + { + name: "no anti-affinity or service affinity exist", + pendingPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1}, + }, + existingPods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, + }, + {ObjectMeta: metav1.ObjectMeta{Name: "p2"}, + Spec: v1.PodSpec{NodeName: "nodeC"}, + }, + }, + addedPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeB"}, + }, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, + }, + }, + { + name: "metadata service-affinity data are updated correctly after adding and removing a pod", + pendingPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1}, + }, + existingPods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, + }, + {ObjectMeta: metav1.ObjectMeta{Name: "p2"}, + Spec: v1.PodSpec{NodeName: "nodeC"}, + }, + }, + addedPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeB"}, + }, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}}, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // getMeta creates predicate meta data given the list of pods. + getState := func(pods []*v1.Pod) (*ServiceAffinity, *framework.CycleState, *preFilterState, *nodeinfosnapshot.Snapshot) { + snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(pods, test.nodes)) + + p := &ServiceAffinity{ + sharedLister: snapshot, + serviceLister: fakelisters.ServiceLister(test.services), + } + cycleState := framework.NewCycleState() + preFilterStatus := p.PreFilter(context.Background(), cycleState, test.pendingPod) + if !preFilterStatus.IsSuccess() { + t.Errorf("prefilter failed with status: %v", preFilterStatus) + } + + plState, err := getPreFilterState(cycleState) + if err != nil { + t.Errorf("failed to get metadata from cycleState: %v", err) + } + + return p, cycleState, plState, snapshot + } + + sortState := func(plState *preFilterState) *preFilterState { + sort.SliceStable(plState.matchingPodList, func(i, j int) bool { + return plState.matchingPodList[i].Name < plState.matchingPodList[j].Name + }) + sort.SliceStable(plState.matchingPodServices, func(i, j int) bool { + return plState.matchingPodServices[i].Name < plState.matchingPodServices[j].Name + }) + return plState + } + + // allPodsState is the state produced when all pods, including test.addedPod are given to prefilter. + _, _, plStateAllPods, _ := getState(append(test.existingPods, test.addedPod)) + + // state is produced for test.existingPods (without test.addedPod). + ipa, state, plState, snapshot := getState(test.existingPods) + // clone the state so that we can compare it later when performing Remove. + plStateOriginal, _ := plState.Clone().(*preFilterState) + + // Add test.addedPod to state1 and verify it is equal to allPodsState. + if err := ipa.AddPod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil { + t.Errorf("error adding pod to preFilterState: %v", err) + } + + if !reflect.DeepEqual(sortState(plStateAllPods), sortState(plState)) { + t.Errorf("State is not equal, got: %v, want: %v", plState, plStateAllPods) + } + + // Remove the added pod pod and make sure it is equal to the original state. + if err := ipa.RemovePod(context.Background(), state, test.pendingPod, test.addedPod, snapshot.NodeInfoMap[test.addedPod.Spec.NodeName]); err != nil { + t.Errorf("error removing pod from preFilterState: %v", err) + } + if !reflect.DeepEqual(sortState(plStateOriginal), sortState(plState)) { + t.Errorf("State is not equal, got: %v, want: %v", plState, plStateOriginal) + } + }) + } +} + +func TestPreFilterStateClone(t *testing.T) { + source := &preFilterState{ + matchingPodList: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}, + }, + matchingPodServices: []*v1.Service{ + {ObjectMeta: metav1.ObjectMeta{Name: "service1"}}, + }, + } + + clone := source.Clone() + if clone == source { + t.Errorf("Clone returned the exact same object!") + } + if !reflect.DeepEqual(clone, source) { + t.Errorf("Copy is not equal to source!") + } +} + func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node { nodes := make([]*v1.Node, 0, len(nodeMap)) for nodeName, labels := range nodeMap {