Merge pull request #51192 from guangxuli/scheduler_priority_functions_map_reduce

Automatic merge from submit-queue (batch tested with PRs 51192, 55010). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Refactoring of priority function(CaculateSpreadPriority) by using map/reduce pattern

**What this PR does / why we need it**:
Ref #24246. exactly ref https://github.com/kubernetes/kubernetes/issues/51455, the PR aim to unify priority functions(deprecated) by using map/reduce pattern.
This is the first step, my todo list(WIP):
- interpod-affnity priority funciton refactoring 
- the priority funcitons register pattern
- deprecated priority function definition and all related logic. etc.

**Which issue this PR fixes**:

no issue, just unify the priority functions pattern.

**Special notes for your reviewer**:
none
**Release note**:
none
This commit is contained in:
Kubernetes Submit Queue 2017-11-19 05:22:23 -08:00 committed by GitHub
commit ab203e54ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 219 additions and 148 deletions

View File

@ -253,7 +253,7 @@ func TestBalancedResourceAllocation(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -161,7 +161,7 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -253,7 +253,7 @@ func TestLeastRequested(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -18,26 +18,79 @@ package priorities
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type PriorityMetadataFactory struct {
serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister
replicaSetLister algorithm.ReplicaSetLister
statefulSetLister algorithm.StatefulSetLister
}
func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, statefulSetLister algorithm.StatefulSetLister) algorithm.MetadataProducer {
factory := &PriorityMetadataFactory{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return factory.PriorityMetadata
}
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
nonZeroRequest *schedulercache.Resource
podTolerations []v1.Toleration
affinity *v1.Affinity
podSelectors []labels.Selector
}
// PriorityMetadata is a MetadataProducer. Node info can be nil.
func PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister)
return &priorityMetadata{
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: tolerationsPreferNoSchedule,
affinity: pod.Spec.Affinity,
podSelectors: podSelectors,
}
}
// getSelectors returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
var selectors []labels.Selector
if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
}
}
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
}
}
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}
return selectors
}

View File

@ -20,11 +20,14 @@ import (
"reflect"
"testing"
apps "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
func TestPriorityMetadata(t *testing.T) {
@ -150,8 +153,13 @@ func TestPriorityMetadata(t *testing.T) {
test: "Produce a priorityMetadata with specified requests",
},
}
mataDataProducer := NewPriorityMetadataFactory(
schedulertesting.FakeServiceLister([]*v1.Service{}),
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
for _, test := range tests {
ptData := PriorityMetadata(test.pod, nil)
ptData := mataDataProducer(test.pod, nil)
if !reflect.DeepEqual(test.expected, ptData) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expected, ptData)
}

View File

@ -210,7 +210,7 @@ func TestMostRequested(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -167,7 +167,7 @@ func TestNodeAffinityPriority(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce)
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil)
list, err := nap(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -108,7 +108,11 @@ func TestNewNodeLabelPriority(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes)
labelPrioritizer := &NodeLabelPrioritizer{
label: test.label,
presence: test.presence,
}
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -142,7 +142,7 @@ func TestNodePreferAvoidPriority(t *testing.T) {
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -17,12 +17,10 @@ limitations under the License.
package priorities
import (
"sync"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@ -46,149 +44,133 @@ func NewSelectorSpreadPriority(
serviceLister algorithm.ServiceLister,
controllerLister algorithm.ControllerLister,
replicaSetLister algorithm.ReplicaSetLister,
statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction {
statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
selectorSpread := &SelectorSpread{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
return selectorSpread.CalculateSpreadPriority
return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
}
// Returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
var selectors []labels.Selector
if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
}
}
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
}
}
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
selectors = append(selectors, selector)
}
}
}
return selectors
}
func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector {
return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
// CalculateSpreadPriorityMap spreads pods across hosts, considering pods
// belonging to the same service,RC,RS or StatefulSet.
// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,
// then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service, RC or RS selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
selectors := s.getSelectors(pod)
// Count similar pods by node
countsByNodeName := make(map[string]float64, len(nodes))
countsByZone := make(map[string]float64, 10)
maxCountByNodeName := float64(0)
countsByNodeNameLock := sync.Mutex{}
if len(selectors) > 0 {
processNodeFunc := func(i int) {
nodeName := nodes[i].Name
count := float64(0)
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
if pod.Namespace != nodePod.Namespace {
continue
}
// When we are replacing a failed pod, we often see the previous
// deleted version while scheduling the replacement.
// Ignore the previous deleted version for spreading purposes
// (it can still be considered for resource restrictions etc.)
if nodePod.DeletionTimestamp != nil {
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
matches = true
break
}
}
if matches {
count++
}
}
zoneId := utilnode.GetZoneKey(nodes[i])
countsByNodeNameLock.Lock()
defer countsByNodeNameLock.Unlock()
countsByNodeName[nodeName] = count
if count > maxCountByNodeName {
maxCountByNodeName = count
}
if zoneId != "" {
countsByZone[zoneId] += count
}
}
workqueue.Parallelize(16, len(nodes), processNodeFunc)
// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var selectors []labels.Selector
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
selectors = priorityMeta.podSelectors
} else {
selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
}
if len(selectors) == 0 {
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(0),
}, nil
}
count := int(0)
for _, nodePod := range nodeInfo.Pods() {
if pod.Namespace != nodePod.Namespace {
continue
}
// When we are replacing a failed pod, we often see the previous
// deleted version while scheduling the replacement.
// Ignore the previous deleted version for spreading purposes
// (it can still be considered for resource restrictions etc.)
if nodePod.DeletionTimestamp != nil {
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
matches = true
break
}
}
if matches {
count++
}
}
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(count),
}, nil
}
// CalculateSpreadPriorityReduce calculates the source of each node
// based on the number of existing matching pods on the node
// where zone information is included on the nodes, it favors nodes
// in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
countsByZone := make(map[string]int, 10)
maxCountByZone := int(0)
maxCountByNodeName := int(0)
for i := range result {
if result[i].Score > maxCountByNodeName {
maxCountByNodeName = result[i].Score
}
zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
if zoneId == "" {
continue
}
countsByZone[zoneId] += result[i].Score
}
for zoneId := range countsByZone {
if countsByZone[zoneId] > maxCountByZone {
maxCountByZone = countsByZone[zoneId]
}
}
// Aggregate by-zone information
// Compute the maximum number of pods hosted in any zone
haveZones := len(countsByZone) != 0
maxCountByZone := float64(0)
for _, count := range countsByZone {
if count > maxCountByZone {
maxCountByZone = count
}
}
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
//score int - scale of 0-maxPriority
// 0 being the lowest priority and maxPriority being the highest
for _, node := range nodes {
maxCountByNodeNameFloat64 := float64(maxCountByNodeName)
maxCountByZoneFloat64 := float64(maxCountByZone)
MaxPriorityFloat64 := float64(schedulerapi.MaxPriority)
for i := range result {
// initializing to the default/max node score of maxPriority
fScore := float64(schedulerapi.MaxPriority)
fScore := MaxPriorityFloat64
if maxCountByNodeName > 0 {
fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName)
fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64)
}
// If there is zone information present, incorporate it
if haveZones {
zoneId := utilnode.GetZoneKey(node)
zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
if zoneId != "" {
zoneScore := float64(schedulerapi.MaxPriority)
zoneScore := MaxPriorityFloat64
if maxCountByZone > 0 {
zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone)
zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneId]) / maxCountByZoneFloat64)
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
result[i].Score = int(fScore)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof(
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore),
)
}
}
return result, nil
return nil
}
type ServiceAntiAffinity struct {

View File

@ -338,17 +338,26 @@ func TestSelectorSpreadPriority(t *testing.T) {
},
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
for i, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeNodeList(test.nodes))
selectorSpread := SelectorSpread{
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
mataDataProducer := NewPriorityMetadataFactory(
schedulertesting.FakeServiceLister(test.services),
schedulertesting.FakeControllerLister(test.rcs),
schedulertesting.FakeReplicaSetLister(test.rss),
schedulertesting.FakeStatefulSetLister(test.sss))
mataData := mataDataProducer(test.pod, nodeNameToInfo)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData)
list, err := ttp(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %v index : %d\n", err, i)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
@ -564,17 +573,25 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
},
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
for i, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(labeledNodes))
selectorSpread := SelectorSpread{
serviceLister: schedulertesting.FakeServiceLister(test.services),
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
mataDataProducer := NewPriorityMetadataFactory(
schedulertesting.FakeServiceLister(test.services),
schedulertesting.FakeControllerLister(test.rcs),
schedulertesting.FakeReplicaSetLister(test.rss),
schedulertesting.FakeStatefulSetLister(test.sss))
mataData := mataDataProducer(test.pod, nodeNameToInfo)
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData)
list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %v index : %d", err, i)
}
// sort the two lists to avoid failures on account of different ordering
sort.Sort(test.expectedList)

View File

@ -227,7 +227,7 @@ func TestTaintAndToleration(t *testing.T) {
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce)
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce, nil)
list, err := ttp(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("%s, unexpected error: %v", test.test, err)

View File

@ -41,18 +41,18 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node {
}
}
func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction) algorithm.PriorityFunction {
func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction, mataData interface{}) algorithm.PriorityFunction {
return func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostResult, err := mapFn(pod, nil, nodeNameToInfo[nodes[i].Name])
hostResult, err := mapFn(pod, mataData, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
result = append(result, hostResult)
}
if reduceFn != nil {
if err := reduceFn(pod, nil, nodeNameToInfo, result); err != nil {
if err := reduceFn(pod, mataData, nodeNameToInfo, result); err != nil {
return nil, err
}
}

View File

@ -46,7 +46,7 @@ func init() {
})
factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
return priorities.PriorityMetadata
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
})
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
@ -90,13 +90,12 @@ func init() {
factory.RegisterPriorityConfigFactory(
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
},
Weight: 1,
},
)
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available
// but do not include it as part of the default priorities
@ -213,7 +212,7 @@ func defaultPriorities() sets.String {
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,

View File

@ -522,18 +522,26 @@ func TestZeroRequest(t *testing.T) {
priorityConfigs := []algorithm.PriorityConfig{
{Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1},
{Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1},
{
Function: algorithmpriorities.NewSelectorSpreadPriority(
schedulertesting.FakeServiceLister([]*v1.Service{}),
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})),
Weight: 1,
},
}
selectorSpreadPriorityMap, selectorSpreadPriorityReduce := algorithmpriorities.NewSelectorSpreadPriority(
schedulertesting.FakeServiceLister([]*v1.Service{}),
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
pc := algorithm.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1}
priorityConfigs = append(priorityConfigs, pc)
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
mataDataProducer := algorithmpriorities.NewPriorityMetadataFactory(
schedulertesting.FakeServiceLister([]*v1.Service{}),
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
mataData := mataDataProducer(test.pod, nodeNameToInfo)
list, err := PrioritizeNodes(
test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs,
test.pod, nodeNameToInfo, mataData, priorityConfigs,
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{})
if err != nil {
t.Errorf("unexpected error: %v", err)