Add support to recompute partial predicate metadata upon adding/removing pods

This commit is contained in:
Bobby (Babak) Salamat 2017-08-14 17:52:13 -07:00
parent acdf625e46
commit 264ca7d158
12 changed files with 592 additions and 81 deletions

View File

@ -17,17 +17,53 @@ limitations under the License.
package predicates package predicates
import ( import (
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util" schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
"sync"
) )
type PredicateMetadataFactory struct { type PredicateMetadataFactory struct {
podLister algorithm.PodLister podLister algorithm.PodLister
} }
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file
// due to the way declarations are processed in predicate declaration unit tests.
type matchingPodAntiAffinityTerm struct {
term *v1.PodAffinityTerm
node *v1.Node
}
// NOTE: When new fields are added/removed or logic is changed, please make sure
// that RemovePod and AddPod functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
//key is a pod full name with the anti-affinity rules.
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
}
// PredicateMetadataProducer: Helper types/variables...
type PredicateMetadataProducer func(pm *predicateMetadata)
var predicateMetaProducerRegisterLock sync.Mutex
var predicateMetadataProducers map[string]PredicateMetadataProducer = make(map[string]PredicateMetadataProducer)
func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMetadataProducer) {
predicateMetaProducerRegisterLock.Lock()
defer predicateMetaProducerRegisterLock.Unlock()
predicateMetadataProducers[predicateName] = precomp
}
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer { func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer {
factory := &PredicateMetadataFactory{ factory := &PredicateMetadataFactory{
podLister, podLister,
@ -52,9 +88,72 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
podPorts: schedutil.GetUsedPorts(pod), podPorts: schedutil.GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms, matchingAntiAffinityTerms: matchingTerms,
} }
for predicateName, precomputeFunc := range predicatePrecomputations { for predicateName, precomputeFunc := range predicateMetadataProducers {
glog.V(10).Infof("Precompute: %v", predicateName) glog.V(10).Infof("Precompute: %v", predicateName)
precomputeFunc(predicateMetadata) precomputeFunc(predicateMetadata)
} }
return predicateMetadata return predicateMetadata
} }
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
// deleted from the system.
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
return fmt.Errorf("deletedPod and meta.pod must not be the same.")
}
// Delete any anti-affinity rule from the deletedPod.
delete(meta.matchingAntiAffinityTerms, deletedPodFullName)
// All pods in the serviceAffinityMatchingPodList are in the same namespace.
// So, if the namespace of the first one is not the same as the namespace of the
// deletedPod, we don't need to check the list, as deletedPod isn't in the list.
if meta.serviceAffinityInUse &&
len(meta.serviceAffinityMatchingPodList) > 0 &&
deletedPod.Namespace == meta.serviceAffinityMatchingPodList[0].Namespace {
for i, pod := range meta.serviceAffinityMatchingPodList {
if schedutil.GetPodFullName(pod) == deletedPodFullName {
meta.serviceAffinityMatchingPodList = append(
meta.serviceAffinityMatchingPodList[:i],
meta.serviceAffinityMatchingPodList[i+1:]...)
break
}
}
}
return nil
}
// AddPod changes predicateMetadata assuming that `newPod` is added to the
// system.
func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache.NodeInfo) error {
addedPodFullName := schedutil.GetPodFullName(addedPod)
if addedPodFullName == schedutil.GetPodFullName(meta.pod) {
return fmt.Errorf("addedPod and meta.pod must not be the same.")
}
if nodeInfo.Node() == nil {
return fmt.Errorf("Invalid node in nodeInfo.")
}
// Add matching anti-affinity terms of the addedPod to the map.
podMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
if err != nil {
return err
}
if len(podMatchingTerms) > 0 {
existingTerms, found := meta.matchingAntiAffinityTerms[addedPodFullName]
if found {
meta.matchingAntiAffinityTerms[addedPodFullName] = append(existingTerms,
podMatchingTerms...)
} else {
meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms
}
}
// If addedPod is in the same namespace as the meta.pod, update the list
// of matching pods if applicable.
if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace {
selector := CreateSelectorFromLabels(meta.pod.Labels)
if selector.Matches(labels.Set(addedPod.Labels)) {
meta.serviceAffinityMatchingPodList = append(meta.serviceAffinityMatchingPodList,
addedPod)
}
}
return nil
}

View File

@ -0,0 +1,357 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package predicates
import (
"fmt"
"reflect"
"sort"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
// sortableAntiAffinityTerms lets us to sort anti-affinity terms.
type sortableAntiAffinityTerms []matchingPodAntiAffinityTerm
// Less establishes some ordering between two matchingPodAntiAffinityTerms for
// sorting.
func (s sortableAntiAffinityTerms) Less(i, j int) bool {
t1, t2 := s[i], s[j]
if t1.node.Name != t2.node.Name {
return t1.node.Name < t2.node.Name
}
if len(t1.term.Namespaces) != len(t2.term.Namespaces) {
return len(t1.term.Namespaces) < len(t2.term.Namespaces)
}
if t1.term.TopologyKey != t2.term.TopologyKey {
return t1.term.TopologyKey < t2.term.TopologyKey
}
if len(t1.term.LabelSelector.MatchLabels) != len(t2.term.LabelSelector.MatchLabels) {
return len(t1.term.LabelSelector.MatchLabels) < len(t2.term.LabelSelector.MatchLabels)
}
return false
}
func (s sortableAntiAffinityTerms) Len() int { return len(s) }
func (s sortableAntiAffinityTerms) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
var _ = sort.Interface(sortableAntiAffinityTerms{})
func sortAntiAffinityTerms(terms map[string][]matchingPodAntiAffinityTerm) {
for k, v := range terms {
sortableTerms := sortableAntiAffinityTerms(v)
sort.Sort(sortableTerms)
terms[k] = sortableTerms
}
}
// sortablePods lets us to sort pods.
type sortablePods []*v1.Pod
func (s sortablePods) Less(i, j int) bool {
return s[i].Namespace < s[j].Namespace ||
(s[i].Namespace == s[j].Namespace && s[i].Name < s[j].Name)
}
func (s sortablePods) Len() int { return len(s) }
func (s sortablePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
var _ = sort.Interface(&sortablePods{})
// sortableServices allows us to sort services.
type sortableServices []*v1.Service
func (s sortableServices) Less(i, j int) bool {
return s[i].Namespace < s[j].Namespace ||
(s[i].Namespace == s[j].Namespace && s[i].Name < s[j].Name)
}
func (s sortableServices) Len() int { return len(s) }
func (s sortableServices) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
var _ = sort.Interface(&sortableServices{})
// predicateMetadataEquivalent returns true if the two metadata are equivalent.
// Note: this function does not compare podRequest.
func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
if !reflect.DeepEqual(meta1.pod, meta2.pod) {
return fmt.Errorf("pods are not the same.")
}
if meta1.podBestEffort != meta2.podBestEffort {
return fmt.Errorf("podBestEfforts are not equal.")
}
if meta1.serviceAffinityInUse != meta1.serviceAffinityInUse {
return fmt.Errorf("serviceAffinityInUses are not equal.")
}
if len(meta1.podPorts) != len(meta2.podPorts) {
return fmt.Errorf("podPorts are not equal.")
}
for !reflect.DeepEqual(meta1.podPorts, meta2.podPorts) {
return fmt.Errorf("podPorts are not equal.")
}
sortAntiAffinityTerms(meta1.matchingAntiAffinityTerms)
sortAntiAffinityTerms(meta2.matchingAntiAffinityTerms)
if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) {
return fmt.Errorf("matchingAntiAffinityTerms are not euqal.")
}
if meta1.serviceAffinityInUse {
sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList)
sort.Sort(sortablePods1)
sortablePods2 := sortablePods(meta2.serviceAffinityMatchingPodList)
sort.Sort(sortablePods2)
if !reflect.DeepEqual(sortablePods1, sortablePods2) {
return fmt.Errorf("serviceAffinityMatchingPodLists are not euqal.")
}
sortableServices1 := sortableServices(meta1.serviceAffinityMatchingPodServices)
sort.Sort(sortableServices1)
sortableServices2 := sortableServices(meta2.serviceAffinityMatchingPodServices)
sort.Sort(sortableServices2)
if !reflect.DeepEqual(sortableServices1, sortableServices2) {
return fmt.Errorf("serviceAffinityMatchingPodServices are not euqal.")
}
}
return nil
}
func TestPredicateMetadata_AddRemovePod(t *testing.T) {
var label1 = map[string]string{
"region": "r1",
"zone": "z11",
}
var label2 = map[string]string{
"region": "r1",
"zone": "z12",
}
var label3 = map[string]string{
"region": "r2",
"zone": "z21",
}
selector1 := map[string]string{"foo": "bar"}
antiAffinityFooBar := &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar"},
},
},
},
TopologyKey: "region",
},
},
}
antiAffinityComplex := &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar", "buzz"},
},
},
},
TopologyKey: "region",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"bar", "security", "test"},
},
},
},
TopologyKey: "zone",
},
},
}
tests := []struct {
description string
pendingPod *v1.Pod
addedPod *v1.Pod
existingPods []*v1.Pod
nodes []*v1.Node
services []*v1.Service
}{
{
description: "no anti-affinity or service affinity exist",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeB"},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
{
description: "metadata anti-affinity terms are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityFooBar,
},
},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{
NodeName: "nodeB",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityFooBar,
},
},
},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
{
description: "metadata service-affinity data are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeB"},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
{
description: "metadata anti-affinity terms and service affinity data are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityFooBar,
},
},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{
NodeName: "nodeA",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityComplex,
},
},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
}
for _, test := range tests {
allPodLister := schedulertesting.FakePodLister(append(test.existingPods, test.addedPod))
// getMeta creates predicate meta data given the list of pods.
getMeta := func(lister schedulertesting.FakePodLister) (*predicateMetadata, map[string]*schedulercache.NodeInfo) {
nodeInfoMap := schedulercache.CreateNodeNameToInfoMap(lister, test.nodes)
// nodeList is a list of non-pointer nodes to feed to FakeNodeListInfo.
nodeList := []v1.Node{}
for _, n := range test.nodes {
nodeList = append(nodeList, *n)
}
_, precompute := NewServiceAffinityPredicate(lister, schedulertesting.FakeServiceLister(test.services), FakeNodeListInfo(nodeList), nil)
RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute)
pmf := PredicateMetadataFactory{lister}
meta := pmf.GetMetadata(test.pendingPod, nodeInfoMap)
return meta.(*predicateMetadata), nodeInfoMap
}
// allPodsMeta is meta data produced when all pods, including test.addedPod
// are given to the metadata producer.
allPodsMeta, _ := getMeta(allPodLister)
// existingPodsMeta1 is meta data produced for test.existingPods (without test.addedPod).
existingPodsMeta1, nodeInfoMap := getMeta(schedulertesting.FakePodLister(test.existingPods))
// Add test.addedPod to existingPodsMeta1 and make sure meta is equal to allPodsMeta
nodeInfo := nodeInfoMap[test.addedPod.Spec.NodeName]
if err := existingPodsMeta1.AddPod(test.addedPod, nodeInfo); err != nil {
t.Errorf("test [%v]: error adding pod to meta: %v", test.description, err)
}
if err := predicateMetadataEquivalent(allPodsMeta, existingPodsMeta1); err != nil {
t.Errorf("test [%v]: meta data are not equivalent: %v", test.description, err)
}
// Remove the added pod and from existingPodsMeta1 an make sure it is equal
// to meta generated for existing pods.
existingPodsMeta2, _ := getMeta(schedulertesting.FakePodLister(test.existingPods))
if err := existingPodsMeta1.RemovePod(test.addedPod); err != nil {
t.Errorf("test [%v]: error removing pod from meta: %v", test.description, err)
}
if err := predicateMetadataEquivalent(existingPodsMeta1, existingPodsMeta2); err != nil {
t.Errorf("test [%v]: meta data are not equivalent: %v", test.description, err)
}
}
}

View File

@ -44,18 +44,6 @@ import (
"k8s.io/metrics/pkg/client/clientset_generated/clientset" "k8s.io/metrics/pkg/client/clientset_generated/clientset"
) )
// PredicateMetadataModifier: Helper types/variables...
type PredicateMetadataModifier func(pm *predicateMetadata)
var predicatePrecomputeRegisterLock sync.Mutex
var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier)
func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) {
predicatePrecomputeRegisterLock.Lock()
defer predicatePrecomputeRegisterLock.Unlock()
predicatePrecomputations[predicateName] = precomp
}
// NodeInfo: Other types for predicate functions... // NodeInfo: Other types for predicate functions...
type NodeInfo interface { type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error) GetNodeInfo(nodeID string) (*v1.Node, error)
@ -107,23 +95,6 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
return node, nil return node, nil
} }
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file
// due to the way declarations are processed in predicate declaration unit tests.
type matchingPodAntiAffinityTerm struct {
term *v1.PodAffinityTerm
node *v1.Node
}
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
}
func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool { func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
// fast path if there is no conflict checking targets. // fast path if there is no conflict checking targets.
if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil { if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
@ -758,43 +729,42 @@ type ServiceAffinity struct {
labels []string labels []string
} }
// serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that // serviceAffinityMetadataProducer should be run once by the scheduler before looping through the Predicate. It is a helper function that
// only should be referenced by NewServiceAffinityPredicate. // only should be referenced by NewServiceAffinityPredicate.
func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) { func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata) {
if pm.pod == nil { if pm.pod == nil {
glog.Errorf("Cannot precompute service affinity, a pod is required to calculate service affinity.") glog.Errorf("Cannot precompute service affinity, a pod is required to calculate service affinity.")
return return
} }
pm.serviceAffinityInUse = true
var errSvc, errList error var errSvc, errList error
// Store services which match the pod. // Store services which match the pod.
pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod) pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod)
selector := CreateSelectorFromLabels(pm.pod.Labels) selector := CreateSelectorFromLabels(pm.pod.Labels)
// consider only the pods that belong to the same namespace
allMatches, errList := s.podLister.List(selector) allMatches, errList := s.podLister.List(selector)
// In the future maybe we will return them as part of the function. // In the future maybe we will return them as part of the function.
if errSvc != nil || errList != nil { if errSvc != nil || errList != nil {
glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList) glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList)
} }
// consider only the pods that belong to the same namespace
pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace) pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
} }
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) { func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataProducer) {
affinity := &ServiceAffinity{ affinity := &ServiceAffinity{
podLister: podLister, podLister: podLister,
serviceLister: serviceLister, serviceLister: serviceLister,
nodeInfo: nodeInfo, nodeInfo: nodeInfo,
labels: labels, labels: labels,
} }
return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation return affinity.checkServiceAffinity, affinity.serviceAffinityMetadataProducer
} }
// checkServiceAffinity is a predicate which matches nodes in such a way to force that // checkServiceAffinity is a predicate which matches nodes in such a way to force that
// ServiceAffinity.labels are homogenous for pods that are scheduled to a node. // ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
// (i.e. it returns true IFF this pod can be added to this node such that all other pods in // (i.e. it returns true IFF this pod can be added to this node such that all other pods in
// the same service are running on nodes with // the same service are running on nodes with the exact same ServiceAffinity.label values).
// the exact same ServiceAffinity.label values).
// //
// For example: // For example:
// If the first pod of a service was scheduled to a node with label "region=foo", // If the first pod of a service was scheduled to a node with label "region=foo",
@ -827,7 +797,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
} else { } else {
// Make the predicate resilient in case metadata is missing. // Make the predicate resilient in case metadata is missing.
pm = &predicateMetadata{pod: pod} pm = &predicateMetadata{pod: pod}
s.serviceAffinityPrecomputation(pm) s.serviceAffinityMetadataProducer(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
} }
node := nodeInfo.Node() node := nodeInfo.Node()
@ -984,7 +954,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta interface
if node == nil { if node == nil {
return false, nil, fmt.Errorf("node not found") return false, nil, fmt.Errorf("node not found")
} }
if !c.satisfiesExistingPodsAntiAffinity(pod, meta, node) { if !c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo) {
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
} }
@ -993,7 +963,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta interface
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil, nil return true, nil, nil
} }
if !c.satisfiesPodsAffinityAntiAffinity(pod, node, affinity) { if !c.satisfiesPodsAffinityAntiAffinity(pod, nodeInfo, affinity) {
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
} }
@ -1062,19 +1032,21 @@ func getPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
return terms return terms
} }
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) ([]matchingPodAntiAffinityTerm, error) { func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, error) {
allNodeNames := make([]string, 0, len(nodeInfoMap)) allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap { for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name) allNodeNames = append(allNodeNames, name)
} }
var lock sync.Mutex var lock sync.Mutex
var result []matchingPodAntiAffinityTerm
var firstError error var firstError error
appendResult := func(toAppend []matchingPodAntiAffinityTerm) { result := make(map[string][]matchingPodAntiAffinityTerm)
appendResult := func(toAppend map[string][]matchingPodAntiAffinityTerm) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
result = append(result, toAppend...) for uid, terms := range toAppend {
result[uid] = append(result[uid], terms...)
}
} }
catchError := func(err error) { catchError := func(err error) {
lock.Lock() lock.Lock()
@ -1091,7 +1063,7 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
catchError(fmt.Errorf("node not found")) catchError(fmt.Errorf("node not found"))
return return
} }
var nodeResult []matchingPodAntiAffinityTerm nodeResult := make(map[string][]matchingPodAntiAffinityTerm)
for _, existingPod := range nodeInfo.PodsWithAffinity() { for _, existingPod := range nodeInfo.PodsWithAffinity() {
affinity := existingPod.Spec.Affinity affinity := existingPod.Spec.Affinity
if affinity == nil { if affinity == nil {
@ -1105,7 +1077,10 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
return return
} }
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
nodeResult = append(nodeResult, matchingPodAntiAffinityTerm{term: &term, node: node}) existingPodFullName := schedutil.GetPodFullName(existingPod)
nodeResult[existingPodFullName] = append(
nodeResult[existingPodFullName],
matchingPodAntiAffinityTerm{term: &term, node: node})
} }
} }
} }
@ -1117,8 +1092,26 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
return result, firstError return result, firstError
} }
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) ([]matchingPodAntiAffinityTerm, error) { func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, error) {
var result []matchingPodAntiAffinityTerm var result []matchingPodAntiAffinityTerm
affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil {
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
}
return result, nil
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, error) {
result := make(map[string][]matchingPodAntiAffinityTerm)
for _, existingPod := range allPods { for _, existingPod := range allPods {
affinity := existingPod.Spec.Affinity affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil { if affinity != nil && affinity.PodAntiAffinity != nil {
@ -1126,15 +1119,13 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) { existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode)
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term) if err != nil {
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) return nil, err
if err != nil { }
return nil, err if len(existingPodMatchingTerms) > 0 {
} existingPodFullName := schedutil.GetPodFullName(existingPod)
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { result[existingPodFullName] = existingPodMatchingTerms
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: existingPodNode})
}
} }
} }
} }
@ -1143,30 +1134,39 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
// Checks if scheduling the pod onto this node would break any anti-affinity // Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods. // rules indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta interface{}, node *v1.Node) bool { func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) bool {
var matchingTerms []matchingPodAntiAffinityTerm node := nodeInfo.Node()
if node == nil {
return false
}
var matchingTerms map[string][]matchingPodAntiAffinityTerm
if predicateMeta, ok := meta.(*predicateMetadata); ok { if predicateMeta, ok := meta.(*predicateMetadata); ok {
matchingTerms = predicateMeta.matchingAntiAffinityTerms matchingTerms = predicateMeta.matchingAntiAffinityTerms
} else { } else {
allPods, err := c.podLister.List(labels.Everything()) // Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
// present in nodeInfo. Pods on other nodes pass the filter.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil { if err != nil {
glog.Errorf("Failed to get all pods, %+v", err) glog.Errorf("Failed to get all pods, %+v", err)
return false return false
} }
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, allPods); err != nil { if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil {
glog.Errorf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err) glog.Errorf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
return false return false
} }
} }
for _, term := range matchingTerms { for _, terms := range matchingTerms {
if len(term.term.TopologyKey) == 0 { for i := range terms {
glog.Error("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") term := &terms[i]
return false if len(term.term.TopologyKey) == 0 {
} glog.Error("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) { return false
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v", }
podName(pod), node.Name, term.term) if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
return false glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return false
}
} }
} }
if glog.V(10) { if glog.V(10) {
@ -1179,15 +1179,19 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
} }
// Checks if scheduling the pod onto this node would break any rules of this pod. // Checks if scheduling the pod onto this node would break any rules of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node *v1.Node, affinity *v1.Affinity) bool { func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo, affinity *v1.Affinity) bool {
allPods, err := c.podLister.List(labels.Everything()) node := nodeInfo.Node()
if node == nil {
return false
}
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil { if err != nil {
return false return false
} }
// Check all affinity terms. // Check all affinity terms.
for _, term := range getPodAffinityTerms(affinity.PodAffinity) { for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term) termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
if err != nil { if err != nil {
glog.Errorf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", glog.Errorf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err) podName(pod), node.Name, term, err)
@ -1220,7 +1224,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
// Check all anti-affinity terms. // Check all anti-affinity terms.
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) { for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term) termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
if err != nil || termMatches { if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v", glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err) podName(pod), node.Name, term, err)
@ -1237,7 +1241,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
return true return true
} }
// PodToleratesNodeTaints checks if a pod tolertaions can tolerate the node taints // PodToleratesNodeTaints checks if a pod tolerations can tolerate the node taints
func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool { return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints. // PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
@ -1245,7 +1249,7 @@ func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulerca
}) })
} }
// PodToleratesNodeNoExecuteTaints checks if a pod tolertaions can tolerate the node's NoExecute taints // PodToleratesNodeNoExecuteTaints checks if a pod tolerations can tolerate the node's NoExecute taints
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool { return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute return t.Effect == v1.TaintEffectNoExecute
@ -1284,7 +1288,7 @@ func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *s
return true, nil, nil return true, nil, nil
} }
// check if node is under memory preasure // check if node is under memory pressure
if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue { if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
} }
@ -1294,7 +1298,7 @@ func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *s
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node // CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition. // reporting disk pressure condition.
func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// check if node is under disk preasure // check if node is under disk pressure
if nodeInfo.DiskPressureCondition() == v1.ConditionTrue { if nodeInfo.DiskPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
} }

View File

@ -1590,7 +1590,7 @@ func TestServiceAffinity(t *testing.T) {
// Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
predicate, precompute := NewServiceAffinityPredicate(schedulertesting.FakePodLister(test.pods), schedulertesting.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels) predicate, precompute := NewServiceAffinityPredicate(schedulertesting.FakePodLister(test.pods), schedulertesting.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels)
// Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
RegisterPredicatePrecomputation("checkServiceAffinity-unitTestPredicate", func(pm *predicateMetadata) { RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) {
if !skipPrecompute { if !skipPrecompute {
precompute(pm) precompute(pm)
} }

View File

@ -80,6 +80,9 @@ type PodLister interface {
// We explicitly return []*v1.Pod, instead of v1.PodList, to avoid // We explicitly return []*v1.Pod, instead of v1.PodList, to avoid
// performing expensive copies that are unneeded. // performing expensive copies that are unneeded.
List(labels.Selector) ([]*v1.Pod, error) List(labels.Selector) ([]*v1.Pod, error)
// This is similar to "List()", but the returned slice does not
// contain pods that don't pass `podFilter`.
FilteredList(podFilter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
} }
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.

View File

@ -129,7 +129,7 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
) )
// Once we generate the predicate we should also Register the Precomputation // Once we generate the predicate we should also Register the Precomputation
predicates.RegisterPredicatePrecomputation(policy.Name, precomputationFunction) predicates.RegisterPredicateMetadataProducer(policy.Name, precomputationFunction)
return predicate return predicate
} }
} else if policy.Argument.LabelsPresence != nil { } else if policy.Argument.LabelsPresence != nil {

View File

@ -93,12 +93,17 @@ func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*
} }
func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
alwaysTrue := func(p *v1.Pod) bool { return true }
return cache.FilteredList(alwaysTrue, selector)
}
func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
var pods []*v1.Pod var pods []*v1.Pod
for _, info := range cache.nodes { for _, info := range cache.nodes {
for _, pod := range info.pods { for _, pod := range info.pods {
if selector.Matches(labels.Set(pod.Labels)) { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod) pods = append(pods, pod)
} }
} }

View File

@ -21,6 +21,8 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
) )
type PodFilter func(*v1.Pod) bool
// Cache collects pods' information and provides node-level aggregated information. // Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup. // It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events. // Cache's operations are pod centric. It does incremental updates based on pod events.
@ -93,4 +95,7 @@ type Cache interface {
// List lists all cached pods (including assumed ones). // List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error) List(labels.Selector) ([]*v1.Pod, error)
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
} }

View File

@ -26,6 +26,7 @@ import (
clientcache "k8s.io/client-go/tools/cache" clientcache "k8s.io/client-go/tools/cache"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper" v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
) )
var emptyResource = Resource{} var emptyResource = Resource{}
@ -458,3 +459,19 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
func getPodKey(pod *v1.Pod) (string, error) { func getPodKey(pod *v1.Pod) (string, error) {
return clientcache.MetaNamespaceKeyFunc(pod) return clientcache.MetaNamespaceKeyFunc(pod)
} }
// Filter implements PodFilter interface. It returns false only if the pod node name
// matches NodeInfo.node and the pod is not found in the pods list. Otherwise,
// returns true.
func (n *NodeInfo) Filter(pod *v1.Pod) bool {
pFullName := util.GetPodFullName(pod)
if pod.Spec.NodeName != n.node.Name {
return true
}
for _, p := range n.pods {
if util.GetPodFullName(p) == pFullName {
return true
}
}
return false
}

View File

@ -57,3 +57,7 @@ func (f *FakeCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.N
} }
func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil } func (f *FakeCache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil
}

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" . "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
var _ NodeLister = &FakeNodeLister{} var _ NodeLister = &FakeNodeLister{}
@ -52,6 +53,15 @@ func (f FakePodLister) List(s labels.Selector) (selected []*v1.Pod, err error) {
return selected, nil return selected, nil
} }
func (f FakePodLister) FilteredList(podFilter schedulercache.PodFilter, s labels.Selector) (selected []*v1.Pod, err error) {
for _, pod := range f {
if podFilter(pod) && s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod)
}
}
return selected, nil
}
var _ ServiceLister = &FakeServiceLister{} var _ ServiceLister = &FakeServiceLister{}
// FakeServiceLister implements ServiceLister on []v1.Service for test purposes. // FakeServiceLister implements ServiceLister on []v1.Service for test purposes.

View File

@ -39,3 +39,10 @@ func GetUsedPorts(pods ...*v1.Pod) map[int]bool {
} }
return ports return ports
} }
// GetPodFullName returns a name that uniquely identifies a pod.
func GetPodFullName(pod *v1.Pod) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format).
return pod.Name + "_" + pod.Namespace
}