diff --git a/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go b/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go index 777be1b1499..381ff05307a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/balanced_resource_allocation_test.go @@ -253,7 +253,7 @@ func TestBalancedResourceAllocation(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go b/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go index c2e0feabbdc..f957e86375b 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/image_locality_test.go @@ -161,7 +161,7 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go b/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go index 08e083361b4..f71ef43d1bc 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/least_requested_test.go @@ -253,7 +253,7 @@ func TestLeastRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata.go b/plugin/pkg/scheduler/algorithm/priorities/metadata.go index 6f3818eb530..1e16c4aad4d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/metadata.go +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata.go @@ -18,26 +18,79 @@ package priorities import ( "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +type PriorityMetadataFactory struct { + serviceLister algorithm.ServiceLister + controllerLister algorithm.ControllerLister + replicaSetLister algorithm.ReplicaSetLister + statefulSetLister algorithm.StatefulSetLister +} + +func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, statefulSetLister algorithm.StatefulSetLister) algorithm.MetadataProducer { + factory := &PriorityMetadataFactory{ + serviceLister: serviceLister, + controllerLister: controllerLister, + replicaSetLister: replicaSetLister, + statefulSetLister: statefulSetLister, + } + return factory.PriorityMetadata +} + // priorityMetadata is a type that is passed as metadata for priority functions type priorityMetadata struct { nonZeroRequest *schedulercache.Resource podTolerations []v1.Toleration affinity *v1.Affinity + podSelectors []labels.Selector } // PriorityMetadata is a MetadataProducer. Node info can be nil. -func PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { +func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil } tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations) + podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister) return &priorityMetadata{ nonZeroRequest: getNonZeroRequests(pod), podTolerations: tolerationsPreferNoSchedule, affinity: pod.Spec.Affinity, + podSelectors: podSelectors, } } + +// getSelectors returns selectors of services, RCs and RSs matching the given pod. +func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector { + var selectors []labels.Selector + if services, err := sl.GetPodServices(pod); err == nil { + for _, service := range services { + selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) + } + } + if rcs, err := cl.GetPodControllers(pod); err == nil { + for _, rc := range rcs { + selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) + } + } + if rss, err := rsl.GetPodReplicaSets(pod); err == nil { + for _, rs := range rss { + if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { + selectors = append(selectors, selector) + } + } + } + if sss, err := ssl.GetPodStatefulSets(pod); err == nil { + for _, ss := range sss { + if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil { + selectors = append(selectors, selector) + } + } + } + return selectors +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go b/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go index 5cea6b6ce79..dbcb562598a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata_test.go @@ -20,11 +20,14 @@ import ( "reflect" "testing" + apps "k8s.io/api/apps/v1beta1" "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) func TestPriorityMetadata(t *testing.T) { @@ -150,8 +153,13 @@ func TestPriorityMetadata(t *testing.T) { test: "Produce a priorityMetadata with specified requests", }, } + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) for _, test := range tests { - ptData := PriorityMetadata(test.pod, nil) + ptData := mataDataProducer(test.pod, nil) if !reflect.DeepEqual(test.expected, ptData) { t.Errorf("%s: expected %#v, got %#v", test.test, test.expected, ptData) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go b/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go index a77692b4af9..0cffea5a33b 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/most_requested_test.go @@ -210,7 +210,7 @@ func TestMostRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index 9d425661a92..f5474134364 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -167,7 +167,7 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce) + nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil) list, err := nap(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go index fbced34e336..7acc6ea7076 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_label_test.go @@ -108,7 +108,11 @@ func TestNewNodeLabelPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes) + labelPrioritizer := &NodeLabelPrioritizer{ + label: test.label, + presence: test.presence, + } + list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go index a18ddcc03d6..0766b9e5488 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_prefer_avoid_pods_test.go @@ -142,7 +142,7 @@ func TestNodePreferAvoidPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 721531f7e32..aa195b0e4f8 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -17,12 +17,10 @@ limitations under the License. package priorities import ( - "sync" + "fmt" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/workqueue" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -46,149 +44,133 @@ func NewSelectorSpreadPriority( serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, - statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction { + statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { selectorSpread := &SelectorSpread{ serviceLister: serviceLister, controllerLister: controllerLister, replicaSetLister: replicaSetLister, statefulSetLister: statefulSetLister, } - return selectorSpread.CalculateSpreadPriority + return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce } -// Returns selectors of services, RCs and RSs matching the given pod. -func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector { - var selectors []labels.Selector - if services, err := sl.GetPodServices(pod); err == nil { - for _, service := range services { - selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) - } - } - if rcs, err := cl.GetPodControllers(pod); err == nil { - for _, rc := range rcs { - selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) - } - } - if rss, err := rsl.GetPodReplicaSets(pod); err == nil { - for _, rs := range rss { - if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - if sss, err := ssl.GetPodStatefulSets(pod); err == nil { - for _, ss := range sss { - if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - return selectors -} - -func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector { - return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) -} - -// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. -// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. +// CalculateSpreadPriorityMap spreads pods across hosts, considering pods +// belonging to the same service,RC,RS or StatefulSet. +// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod, +// then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods. // i.e. it pushes the scheduler towards a node where there's the smallest number of -// pods which match the same service, RC or RS selectors as the pod being scheduled. -// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. -func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { - selectors := s.getSelectors(pod) - - // Count similar pods by node - countsByNodeName := make(map[string]float64, len(nodes)) - countsByZone := make(map[string]float64, 10) - maxCountByNodeName := float64(0) - countsByNodeNameLock := sync.Mutex{} - - if len(selectors) > 0 { - processNodeFunc := func(i int) { - nodeName := nodes[i].Name - count := float64(0) - for _, nodePod := range nodeNameToInfo[nodeName].Pods() { - if pod.Namespace != nodePod.Namespace { - continue - } - // When we are replacing a failed pod, we often see the previous - // deleted version while scheduling the replacement. - // Ignore the previous deleted version for spreading purposes - // (it can still be considered for resource restrictions etc.) - if nodePod.DeletionTimestamp != nil { - glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { - matches = true - break - } - } - if matches { - count++ - } - } - zoneId := utilnode.GetZoneKey(nodes[i]) - - countsByNodeNameLock.Lock() - defer countsByNodeNameLock.Unlock() - countsByNodeName[nodeName] = count - if count > maxCountByNodeName { - maxCountByNodeName = count - } - if zoneId != "" { - countsByZone[zoneId] += count - } - } - workqueue.Parallelize(16, len(nodes), processNodeFunc) +// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled. +func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + var selectors []labels.Selector + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + + priorityMeta, ok := meta.(*priorityMetadata) + if ok { + selectors = priorityMeta.podSelectors + } else { + selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) + } + + if len(selectors) == 0 { + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(0), + }, nil + } + + count := int(0) + for _, nodePod := range nodeInfo.Pods() { + if pod.Namespace != nodePod.Namespace { + continue + } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(count), + }, nil +} + +// CalculateSpreadPriorityReduce calculates the source of each node +// based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes +// in zones with fewer existing matching pods. +func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { + countsByZone := make(map[string]int, 10) + maxCountByZone := int(0) + maxCountByNodeName := int(0) + + for i := range result { + if result[i].Score > maxCountByNodeName { + maxCountByNodeName = result[i].Score + } + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) + if zoneId == "" { + continue + } + countsByZone[zoneId] += result[i].Score + } + + for zoneId := range countsByZone { + if countsByZone[zoneId] > maxCountByZone { + maxCountByZone = countsByZone[zoneId] + } } - // Aggregate by-zone information - // Compute the maximum number of pods hosted in any zone haveZones := len(countsByZone) != 0 - maxCountByZone := float64(0) - for _, count := range countsByZone { - if count > maxCountByZone { - maxCountByZone = count - } - } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - //score int - scale of 0-maxPriority - // 0 being the lowest priority and maxPriority being the highest - for _, node := range nodes { + maxCountByNodeNameFloat64 := float64(maxCountByNodeName) + maxCountByZoneFloat64 := float64(maxCountByZone) + MaxPriorityFloat64 := float64(schedulerapi.MaxPriority) + + for i := range result { // initializing to the default/max node score of maxPriority - fScore := float64(schedulerapi.MaxPriority) + fScore := MaxPriorityFloat64 if maxCountByNodeName > 0 { - fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName) + fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64) } - // If there is zone information present, incorporate it if haveZones { - zoneId := utilnode.GetZoneKey(node) + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) if zoneId != "" { - zoneScore := float64(schedulerapi.MaxPriority) + zoneScore := MaxPriorityFloat64 if maxCountByZone > 0 { - zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone) + zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneId]) / maxCountByZoneFloat64) } fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } - - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) + result[i].Score = int(fScore) if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. glog.V(10).Infof( - "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore), ) } } - return result, nil + return nil } type ServiceAntiAffinity struct { diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index bec4afe7808..d3cb19cb635 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -338,17 +338,26 @@ func TestSelectorSpreadPriority(t *testing.T) { }, } - for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) + for i, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeNodeList(test.nodes)) selectorSpread := SelectorSpread{ serviceLister: schedulertesting.FakeServiceLister(test.services), controllerLister: schedulertesting.FakeControllerLister(test.rcs), replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss), statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss), } - list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) + + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister(test.services), + schedulertesting.FakeControllerLister(test.rcs), + schedulertesting.FakeReplicaSetLister(test.rss), + schedulertesting.FakeStatefulSetLister(test.sss)) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + + ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData) + list, err := ttp(test.pod, nodeNameToInfo, makeNodeList(test.nodes)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %v index : %d\n", err, i) } if !reflect.DeepEqual(test.expectedList, list) { t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) @@ -564,17 +573,25 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { }, } - for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil) + for i, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(labeledNodes)) selectorSpread := SelectorSpread{ serviceLister: schedulertesting.FakeServiceLister(test.services), controllerLister: schedulertesting.FakeControllerLister(test.rcs), replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss), statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss), } - list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) + + mataDataProducer := NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister(test.services), + schedulertesting.FakeControllerLister(test.rcs), + schedulertesting.FakeReplicaSetLister(test.rss), + schedulertesting.FakeStatefulSetLister(test.sss)) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData) + list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes)) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %v index : %d", err, i) } // sort the two lists to avoid failures on account of different ordering sort.Sort(test.expectedList) diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go index 50e0b4d36f9..f54ce45613c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go @@ -227,7 +227,7 @@ func TestTaintAndToleration(t *testing.T) { } for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) - ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce) + ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce, nil) list, err := ttp(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("%s, unexpected error: %v", test.test, err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/test_util.go b/plugin/pkg/scheduler/algorithm/priorities/test_util.go index 9eb26f2d93c..312c7619410 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/test_util.go +++ b/plugin/pkg/scheduler/algorithm/priorities/test_util.go @@ -41,18 +41,18 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node { } } -func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction) algorithm.PriorityFunction { +func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction, mataData interface{}) algorithm.PriorityFunction { return func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { - hostResult, err := mapFn(pod, nil, nodeNameToInfo[nodes[i].Name]) + hostResult, err := mapFn(pod, mataData, nodeNameToInfo[nodes[i].Name]) if err != nil { return nil, err } result = append(result, hostResult) } if reduceFn != nil { - if err := reduceFn(pod, nil, nodeNameToInfo, result); err != nil { + if err := reduceFn(pod, mataData, nodeNameToInfo, result); err != nil { return nil, err } } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index f6aca517eb5..99d80e566d7 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -46,7 +46,7 @@ func init() { }) factory.RegisterPriorityMetadataProducerFactory( func(args factory.PluginFactoryArgs) algorithm.MetadataProducer { - return priorities.PriorityMetadata + return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) }) registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) @@ -90,13 +90,12 @@ func init() { factory.RegisterPriorityConfigFactory( "ServiceSpreadingPriority", factory.PriorityConfigFactory{ - Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { + MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{}) }, Weight: 1, }, ) - // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // Register the priority function so that its available // but do not include it as part of the default priorities @@ -213,7 +212,7 @@ func defaultPriorities() sets.String { factory.RegisterPriorityConfigFactory( "SelectorSpreadPriority", factory.PriorityConfigFactory{ - Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { + MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) }, Weight: 1, diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index ce4ff1300dc..e5890daa38e 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -522,18 +522,26 @@ func TestZeroRequest(t *testing.T) { priorityConfigs := []algorithm.PriorityConfig{ {Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1}, {Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1}, - { - Function: algorithmpriorities.NewSelectorSpreadPriority( - schedulertesting.FakeServiceLister([]*v1.Service{}), - schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), - schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), - schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})), - Weight: 1, - }, } + selectorSpreadPriorityMap, selectorSpreadPriorityReduce := algorithmpriorities.NewSelectorSpreadPriority( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) + pc := algorithm.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1} + priorityConfigs = append(priorityConfigs, pc) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) + + mataDataProducer := algorithmpriorities.NewPriorityMetadataFactory( + schedulertesting.FakeServiceLister([]*v1.Service{}), + schedulertesting.FakeControllerLister([]*v1.ReplicationController{}), + schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}), + schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})) + mataData := mataDataProducer(test.pod, nodeNameToInfo) + list, err := PrioritizeNodes( - test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs, + test.pod, nodeNameToInfo, mataData, priorityConfigs, schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err)