Merging selectors for spreading into one

Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2019-11-11 16:02:36 -05:00
parent bcb171b375
commit c35fe2c801
6 changed files with 48 additions and 44 deletions

View File

@ -92,6 +92,7 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)

View File

@ -58,7 +58,7 @@ type priorityMetadata struct {
podLimits *schedulernodeinfo.Resource
podTolerations []v1.Toleration
affinity *v1.Affinity
podSelectors []labels.Selector
podSelector labels.Selector
controllerRef *metav1.OwnerReference
podFirstServiceSelector labels.Selector
totalNumNodes int
@ -88,7 +88,7 @@ func (pmf *MetadataFactory) PriorityMetadata(
podLimits: getResourceLimits(pod),
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
affinity: pod.Spec.Affinity,
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: metav1.GetControllerOf(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
totalNumNodes: totalNumNodes,
@ -105,37 +105,48 @@ func getFirstServiceSelector(pod *v1.Pod, sl corelisters.ServiceLister) (firstSe
return nil
}
// getSelectors returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) []labels.Selector {
var selectors []labels.Selector
// getSelector returns a selector for the services, RCs, RSs, and SSs matching the given pod.
func getSelector(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) labels.Selector {
labelSet := make(labels.Set)
// Since services, RCs, RSs and SSs match the pod, they won't have conflicting
// labels. Merging is safe.
if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
labelSet = labels.Merge(labelSet, service.Spec.Selector)
}
}
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
labelSet = labels.Merge(labelSet, rc.Spec.Selector)
}
}
selector := labels.NewSelector()
if len(labelSet) != 0 {
selector = labelSet.AsSelector()
}
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector)
if other, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
selectors = append(selectors, selector)
if other, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
if r, ok := other.Requirements(); ok {
selector = selector.Add(r...)
}
}
}
}
return selectors
return selector
}

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
@ -140,6 +141,7 @@ func TestPriorityMetadata(t *testing.T) {
podLimits: nonPodLimits,
podTolerations: tolerations,
affinity: podAffinity,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with default requests",
},
@ -149,8 +151,9 @@ func TestPriorityMetadata(t *testing.T) {
podLimits: nonPodLimits,
podTolerations: tolerations,
affinity: nil,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with specified requests",
name: "Produce a priorityMetadata with tolerations and requests",
},
{
pod: podWithAffinityAndRequests,
@ -158,8 +161,9 @@ func TestPriorityMetadata(t *testing.T) {
podLimits: specifiedPodLimits,
podTolerations: nil,
affinity: podAffinity,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with specified requests",
name: "Produce a priorityMetadata with affinity and requests",
},
}
client := clientsetfake.NewSimpleClientset()

View File

@ -66,7 +66,7 @@ func NewSelectorSpreadPriority(
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
var selectors []labels.Selector
var selector labels.Selector
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
@ -74,20 +74,12 @@ func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
selectors = priorityMeta.podSelectors
selector = priorityMeta.podSelector
} else {
selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
selector = getSelector(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
if len(selectors) == 0 {
return framework.NodeScore{
Name: node.Name,
Score: 0,
}, nil
}
count := countMatchingPods(pod.Namespace, selectors, nodeInfo)
count := countMatchingPods(pod.Namespace, selector, nodeInfo)
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
@ -179,9 +171,9 @@ func NewServiceAntiAffinityPriority(podLister schedulerlisters.PodLister, servic
return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce
}
// countMatchingPods cout pods based on namespace and matching all selectors
func countMatchingPods(namespace string, selectors []labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || len(selectors) == 0 {
// countMatchingPods counts pods based on namespace and matching all selectors
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || selector.Empty() {
return 0
}
count := 0
@ -189,14 +181,7 @@ func countMatchingPods(namespace string, selectors []labels.Selector, nodeInfo *
// Ignore pods being deleted for spreading purposes
// Similar to how it is done for SelectorSpreadPriority
if namespace == pod.Namespace && pod.DeletionTimestamp == nil {
matches := true
for _, selector := range selectors {
if !selector.Matches(labels.Set(pod.Labels)) {
matches = false
break
}
}
if matches {
if selector.Matches(labels.Set(pod.Labels)) {
count++
}
}
@ -219,12 +204,14 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta
} else {
firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
}
//pods matched namespace,selector on current node
var selectors []labels.Selector
// Pods matched namespace,selector on current node.
var selector labels.Selector
if firstServiceSelector != nil {
selectors = append(selectors, firstServiceSelector)
selector = firstServiceSelector
} else {
selector = labels.NewSelector()
}
score := countMatchingPods(pod.Namespace, selectors, nodeInfo)
score := countMatchingPods(pod.Namespace, selector, nodeInfo)
return framework.NodeScore{
Name: node.Name,

View File

@ -20,6 +20,7 @@ import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -359,8 +360,8 @@ func TestSelectorSpreadPriority(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v \n", err)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected %#v, got %#v", test.expectedList, list)
if diff := cmp.Diff(test.expectedList, list); diff != "" {
t.Errorf("wrong priorities produced (-want, +got): %s", diff)
}
})
}

View File

@ -96,7 +96,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
for i := 0; i < b.N; i++ {
meta := &priorityMetadata{
podSelectors: getSelectors(pod, ss.serviceLister, ss.controllerLister, ss.replicaSetLister, ss.statefulSetLister),
podSelector: getSelector(pod, ss.serviceLister, ss.controllerLister, ss.replicaSetLister, ss.statefulSetLister),
}
_, err := runMapReducePriority(ss.CalculateSpreadPriorityMap, ss.CalculateSpreadPriorityReduce, meta, pod, snapshot, filteredNodes)
if err != nil {