From 4aa92bac73f33dd88aa4135fb5c4699cd452569f Mon Sep 17 00:00:00 2001 From: Gavin Date: Thu, 2 Nov 2017 15:08:38 +0800 Subject: [PATCH 1/3] Refactoring of priority function(CaculateSpreadPriority) by using map/reduce pattern --- .../priorities/selector_spreading.go | 192 ++++++++---------- 1 file changed, 88 insertions(+), 104 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 721531f7e32..940813f2cb9 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -17,12 +17,10 @@ limitations under the License. package priorities import ( - "sync" + "fmt" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/workqueue" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -46,149 +44,135 @@ func NewSelectorSpreadPriority( serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, - statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction { + statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { selectorSpread := &SelectorSpread{ serviceLister: serviceLister, controllerLister: controllerLister, replicaSetLister: replicaSetLister, statefulSetLister: statefulSetLister, } - return selectorSpread.CalculateSpreadPriority + return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce } -// Returns selectors of services, RCs and RSs matching the given pod. -func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector { - var selectors []labels.Selector - if services, err := sl.GetPodServices(pod); err == nil { - for _, service := range services { - selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) - } - } - if rcs, err := cl.GetPodControllers(pod); err == nil { - for _, rc := range rcs { - selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) - } - } - if rss, err := rsl.GetPodReplicaSets(pod); err == nil { - for _, rs := range rss { - if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - if sss, err := ssl.GetPodStatefulSets(pod); err == nil { - for _, ss := range sss { - if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - return selectors -} - -func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector { - return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) -} - -// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. +// CalculateSpreadPriorityMap spreads pods across hosts, considering pods belonging to the same service or replication controller. // When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods. // i.e. it pushes the scheduler towards a node where there's the smallest number of // pods which match the same service, RC or RS selectors as the pod being scheduled. -// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. -func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { - selectors := s.getSelectors(pod) +func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + var selectors []labels.Selector + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } - // Count similar pods by node - countsByNodeName := make(map[string]float64, len(nodes)) - countsByZone := make(map[string]float64, 10) - maxCountByNodeName := float64(0) - countsByNodeNameLock := sync.Mutex{} + priorityMeta, ok := meta.(*priorityMetadata) + if ok { + selectors = priorityMeta.podSelectors + } else { + selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) + } + + if len(selectors) == 0 { + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(0), + }, nil + } + + count := float64(0) + for _, nodePod := range nodeInfo.Pods() { + if pod.Namespace != nodePod.Namespace { + continue + } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(count), + }, nil +} + +// CalculateSpreadPriorityReduce calculates the source of each node based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. +func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { + var selectors []labels.Selector + countsByZone := make(map[string]int, 10) + maxCountByZone := int(0) + maxCountByNodeName := int(0) + + priorityMeta, ok := meta.(*priorityMetadata) + if ok { + selectors = priorityMeta.podSelectors + } else { + selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) + } if len(selectors) > 0 { - processNodeFunc := func(i int) { - nodeName := nodes[i].Name - count := float64(0) - for _, nodePod := range nodeNameToInfo[nodeName].Pods() { - if pod.Namespace != nodePod.Namespace { - continue - } - // When we are replacing a failed pod, we often see the previous - // deleted version while scheduling the replacement. - // Ignore the previous deleted version for spreading purposes - // (it can still be considered for resource restrictions etc.) - if nodePod.DeletionTimestamp != nil { - glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { - matches = true - break - } - } - if matches { - count++ - } + for i := range result { + if result[i].Score > maxCountByNodeName { + maxCountByNodeName = result[i].Score } - zoneId := utilnode.GetZoneKey(nodes[i]) - - countsByNodeNameLock.Lock() - defer countsByNodeNameLock.Unlock() - countsByNodeName[nodeName] = count - if count > maxCountByNodeName { - maxCountByNodeName = count - } - if zoneId != "" { - countsByZone[zoneId] += count + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) + if zoneId == "" { + continue } + countsByZone[zoneId] += result[i].Score + } + } + + for zoneId := range countsByZone { + if countsByZone[zoneId] > maxCountByZone { + maxCountByZone = countsByZone[zoneId] } - workqueue.Parallelize(16, len(nodes), processNodeFunc) } - // Aggregate by-zone information - // Compute the maximum number of pods hosted in any zone haveZones := len(countsByZone) != 0 - maxCountByZone := float64(0) - for _, count := range countsByZone { - if count > maxCountByZone { - maxCountByZone = count - } - } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - //score int - scale of 0-maxPriority - // 0 being the lowest priority and maxPriority being the highest - for _, node := range nodes { + for i := range result { // initializing to the default/max node score of maxPriority fScore := float64(schedulerapi.MaxPriority) if maxCountByNodeName > 0 { - fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName) + fScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByNodeName-result[i].Score) / float64(maxCountByNodeName)) } - // If there is zone information present, incorporate it if haveZones { - zoneId := utilnode.GetZoneKey(node) + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) if zoneId != "" { zoneScore := float64(schedulerapi.MaxPriority) if maxCountByZone > 0 { - zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone) + zoneScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByZone-countsByZone[zoneId]) / float64(maxCountByZone)) } fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } - - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) + result[i].Score = int(fScore) if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. glog.V(10).Infof( - "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore), ) } } - return result, nil + return nil } type ServiceAntiAffinity struct { From bed435deca66d436c11aab05906cdecc8e8298e7 Mon Sep 17 00:00:00 2001 From: Gavin Date: Thu, 2 Nov 2017 15:09:06 +0800 Subject: [PATCH 2/3] compute pod selectors in priority meta data producer --- .../balanced_resource_allocation_test.go | 2 +- .../priorities/image_locality_test.go | 2 +- .../priorities/least_requested_test.go | 2 +- .../algorithm/priorities/metadata.go | 55 ++++++++++++++++++- .../algorithm/priorities/metadata_test.go | 10 +++- .../priorities/most_requested_test.go | 2 +- .../priorities/node_affinity_test.go | 2 +- .../algorithm/priorities/node_label_test.go | 6 +- .../priorities/node_prefer_avoid_pods_test.go | 2 +- .../priorities/selector_spreading_test.go | 34 +++++++++--- .../priorities/taint_toleration_test.go | 2 +- .../algorithm/priorities/test_util.go | 6 +- .../algorithmprovider/defaults/defaults.go | 8 +-- .../scheduler/core/generic_scheduler_test.go | 26 ++++++--- 14 files changed, 125 insertions(+), 34 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go b/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go index 777be1b1499..381ff05307a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go @@ -253,7 +253,7 @@ func TestBalancedResourceAllocation(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go b/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go index c2e0feabbdc..f957e86375b 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go @@ -161,7 +161,7 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go b/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go index 08e083361b4..f71ef43d1bc 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go @@ -253,7 +253,7 @@ func TestLeastRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata.go b/plugin/pkg/scheduler/algorithm/priorities/metadata.go index 6f3818eb530..1e16c4aad4d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/metadata.go +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata.go @@ -18,26 +18,79 @@ package priorities import ( "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +type PriorityMetadataFactory struct { + serviceLister algorithm.ServiceLister + controllerLister algorithm.ControllerLister + replicaSetLister algorithm.ReplicaSetLister + statefulSetLister algorithm.StatefulSetLister +} + +func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, statefulSetLister algorithm.StatefulSetLister) algorithm.MetadataProducer { + factory := &PriorityMetadataFactory{ + serviceLister: serviceLister, + controllerLister: controllerLister, + replicaSetLister: replicaSetLister, + statefulSetLister: statefulSetLister, + } + return factory.PriorityMetadata +} + // priorityMetadata is a type that is passed as metadata for priority functions type priorityMetadata struct { nonZeroRequest *schedulercache.Resource podTolerations []v1.Toleration affinity *v1.Affinity + podSelectors []labels.Selector } // PriorityMetadata is a MetadataProducer. Node info can be nil. -func PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { +func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil } tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations) + podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister) return &priorityMetadata{ nonZeroRequest: getNonZeroRequests(pod), podTolerations: tolerationsPreferNoSchedule, affinity: pod.Spec.Affinity, + podSelectors: podSelectors, } } + +// getSelectors returns selectors of services, RCs and RSs matching the given pod. +func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector { + var selectors []labels.Selector + if services, err := sl.GetPodServices(pod); err == nil { + for _, service := range services { + selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) + } + } + if rcs, err := cl.GetPodControllers(pod); err == nil { + for _, rc := range rcs { + selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) + } + } + if rss, err := rsl.GetPodReplicaSets(pod); err == nil { + for _, rs := range rss { + if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { + selectors = append(selectors, selector) + } + } + } + if sss, err := ssl.GetPodStatefulSets(pod); err == nil { + for _, ss := range sss { + if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil { + selectors = append(selectors, selector) + } + } + } + return selectors +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go b/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go index b8fd653ba24..0258d6e0100 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go @@ -20,11 +20,14 @@ import ( "reflect" "testing" + apps "k8s.io/api/apps/v1beta1" "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) func TestPriorityMetadata(t *testing.T) { @@ -123,8 +126,13 @@ func TestPriorityMetadata(t *testing.T) { test: "Produce a priorityMetadata with specified requests", }, } + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) for _, test := range tests { - ptData := PriorityMetadata(test.pod, nil) + ptData := mataDataProducer(test.pod, nil) if !reflect.DeepEqual(test.expected, ptData) { t.Errorf("%s: expected %#v, got %#v", test.test, test.expected, ptData) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go b/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go index a77692b4af9..0cffea5a33b 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go @@ -210,7 +210,7 @@ func TestMostRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index 9d425661a92..f5474134364 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -167,7 +167,7 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce) + nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil) list, err := nap(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go index fbced34e336..7acc6ea7076 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go @@ -108,7 +108,11 @@ func TestNewNodeLabelPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes) + labelPrioritizer := &NodeLabelPrioritizer{ + label: test.label, + presence: test.presence, + } + list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go index a18ddcc03d6..0766b9e5488 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go @@ -142,7 +142,7 @@ func TestNodePreferAvoidPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index bec4afe7808..0e7ed19ff9f 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -338,17 +338,26 @@ func TestSelectorSpreadPriority(t *testing.T) { }, } - for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) + for i, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeNodeList(test.nodes)) selectorSpread := SelectorSpread{ serviceLister: schedulertesting.FakeServiceLister(test.services), controllerLister: schedulertesting.FakeControllerLister(test.rcs), replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss), statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss), } - list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) + + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister(test.services), + schedulertesting.FakeControllerLister(test.rcs), + schedulertesting.FakeReplicaSetLister(test.rss), + schedulertesting.FakeStatefulSetLister(test.sss)) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + + ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData) + list, err := ttp(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %v index : %d\n", err, i) } if !reflect.DeepEqual(test.expectedList, list) { t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) @@ -544,6 +553,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")), buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")), }, + //nodes: []string{nodeMachine1Zone3, nodeMachine1Zone2, nodeMachine1Zone3}, rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ // Note that because we put two pods on the same node (nodeMachine1Zone3), @@ -564,17 +574,25 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { }, } - for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) + for i, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(labeledNodes)) selectorSpread := SelectorSpread{ serviceLister: schedulertesting.FakeServiceLister(test.services), controllerLister: schedulertesting.FakeControllerLister(test.rcs), replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss), statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss), } - list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) + + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister(test.services), + schedulertesting.FakeControllerLister(test.rcs), + schedulertesting.FakeReplicaSetLister(test.rss), + schedulertesting.FakeStatefulSetLister(test.sss)) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData) + list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %v index : %d", err, i) } // sort the two lists to avoid failures on account of different ordering sort.Sort(test.expectedList) diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go index 50e0b4d36f9..f54ce45613c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go @@ -227,7 +227,7 @@ func TestTaintAndToleration(t *testing.T) { } for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce) + ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce, nil) list, err := ttp(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("%s, unexpected error: %v", test.test, err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/test_util.go b/plugin/pkg/scheduler/algorithm/priorities/test_util.go index 9eb26f2d93c..312c7619410 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/test_util.go +++ b/plugin/pkg/scheduler/algorithm/priorities/test_util.go @@ -41,18 +41,18 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node { } } -func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction) algorithm.PriorityFunction { +func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction, mataData interface{}) algorithm.PriorityFunction { return func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { - hostResult, err := mapFn(pod, nil, nodeNameToInfo[nodes[i].Name]) + hostResult, err := mapFn(pod, mataData, nodeNameToInfo[nodes[i].Name]) if err != nil { return nil, err } result = append(result, hostResult) } if reduceFn != nil { - if err := reduceFn(pod, nil, nodeNameToInfo, result); err != nil { + if err := reduceFn(pod, mataData, nodeNameToInfo, result); err != nil { return nil, err } } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index f6aca517eb5..d9242cf172f 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -46,7 +46,7 @@ func init() { }) factory.RegisterPriorityMetadataProducerFactory( func(args factory.PluginFactoryArgs) algorithm.MetadataProducer { - return priorities.PriorityMetadata + return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) }) registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) @@ -90,13 +90,12 @@ func init() { factory.RegisterPriorityConfigFactory( "ServiceSpreadingPriority", factory.PriorityConfigFactory{ - Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { + MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{}) }, Weight: 1, }, ) - // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // Register the priority function so that its available // but do not include it as part of the default priorities @@ -213,12 +212,13 @@ func defaultPriorities() sets.String { factory.RegisterPriorityConfigFactory( "SelectorSpreadPriority", factory.PriorityConfigFactory{ - Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { + MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) }, Weight: 1, }, ), + // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods. factory.RegisterPriorityConfigFactory( diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index ce4ff1300dc..e5890daa38e 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -522,18 +522,26 @@ func TestZeroRequest(t *testing.T) { priorityConfigs := []algorithm.PriorityConfig{ {Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1}, {Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1}, - { - Function: algorithmpriorities.NewSelectorSpreadPriority( - schedulertesting.FakeServiceLister([]*v1.Service{}), - schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), - schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), - schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})), - Weight: 1, - }, } + selectorSpreadPriorityMap, selectorSpreadPriorityReduce := algorithmpriorities.NewSelectorSpreadPriority( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) + pc := algorithm.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1} + priorityConfigs = append(priorityConfigs, pc) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) + + mataDataProducer := algorithmpriorities.NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + list, err := PrioritizeNodes( - test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs, + test.pod, nodeNameToInfo, mataData, priorityConfigs, schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err) From 8fa59aa9b08b3a8b46376a35dfa955d6450a14a8 Mon Sep 17 00:00:00 2001 From: Gavin Date: Sun, 19 Nov 2017 11:08:02 +0800 Subject: [PATCH 3/3] address review comments --- .../priorities/selector_spreading.go | 54 +++++++++---------- .../priorities/selector_spreading_test.go | 1 - .../algorithmprovider/defaults/defaults.go | 1 - 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 940813f2cb9..aa195b0e4f8 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -54,11 +54,13 @@ func NewSelectorSpreadPriority( return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce } -// CalculateSpreadPriorityMap spreads pods across hosts, considering pods belonging to the same service or replication controller. -// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. +// CalculateSpreadPriorityMap spreads pods across hosts, considering pods +// belonging to the same service,RC,RS or StatefulSet. +// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod, +// then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods. // i.e. it pushes the scheduler towards a node where there's the smallest number of -// pods which match the same service, RC or RS selectors as the pod being scheduled. +// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled. func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { var selectors []labels.Selector node := nodeInfo.Node() @@ -80,7 +82,7 @@ func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{ }, nil } - count := float64(0) + count := int(0) for _, nodePod := range nodeInfo.Pods() { if pod.Namespace != nodePod.Namespace { continue @@ -110,32 +112,24 @@ func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{ }, nil } -// CalculateSpreadPriorityReduce calculates the source of each node based on the number of existing matching pods on the node -// where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. +// CalculateSpreadPriorityReduce calculates the source of each node +// based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes +// in zones with fewer existing matching pods. func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { - var selectors []labels.Selector countsByZone := make(map[string]int, 10) maxCountByZone := int(0) maxCountByNodeName := int(0) - priorityMeta, ok := meta.(*priorityMetadata) - if ok { - selectors = priorityMeta.podSelectors - } else { - selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) - } - - if len(selectors) > 0 { - for i := range result { - if result[i].Score > maxCountByNodeName { - maxCountByNodeName = result[i].Score - } - zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) - if zoneId == "" { - continue - } - countsByZone[zoneId] += result[i].Score + for i := range result { + if result[i].Score > maxCountByNodeName { + maxCountByNodeName = result[i].Score } + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) + if zoneId == "" { + continue + } + countsByZone[zoneId] += result[i].Score } for zoneId := range countsByZone { @@ -146,19 +140,23 @@ func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interfa haveZones := len(countsByZone) != 0 + maxCountByNodeNameFloat64 := float64(maxCountByNodeName) + maxCountByZoneFloat64 := float64(maxCountByZone) + MaxPriorityFloat64 := float64(schedulerapi.MaxPriority) + for i := range result { // initializing to the default/max node score of maxPriority - fScore := float64(schedulerapi.MaxPriority) + fScore := MaxPriorityFloat64 if maxCountByNodeName > 0 { - fScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByNodeName-result[i].Score) / float64(maxCountByNodeName)) + fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64) } // If there is zone information present, incorporate it if haveZones { zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) if zoneId != "" { - zoneScore := float64(schedulerapi.MaxPriority) + zoneScore := MaxPriorityFloat64 if maxCountByZone > 0 { - zoneScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByZone-countsByZone[zoneId]) / float64(maxCountByZone)) + zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneId]) / maxCountByZoneFloat64) } fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 0e7ed19ff9f..d3cb19cb635 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -553,7 +553,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")), buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")), }, - //nodes: []string{nodeMachine1Zone3, nodeMachine1Zone2, nodeMachine1Zone3}, rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ // Note that because we put two pods on the same node (nodeMachine1Zone3), diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index d9242cf172f..99d80e566d7 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -218,7 +218,6 @@ func defaultPriorities() sets.String { Weight: 1, }, ), - // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods. factory.RegisterPriorityConfigFactory(