From f8632e22030a2d194c1b8084b519df2d1b6ac5a0 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 30 Sep 2016 15:14:29 +0200 Subject: [PATCH] Migrate EqualPriority to MapReduce-like framework. --- .../priorities/selector_spreading.go | 5 ++-- .../algorithmprovider/defaults/defaults.go | 2 +- plugin/pkg/scheduler/extender_test.go | 24 +++++++++++------ plugin/pkg/scheduler/generic_scheduler.go | 26 ++++++++++++------- .../pkg/scheduler/generic_scheduler_test.go | 6 ++--- plugin/pkg/scheduler/scheduler_test.go | 2 ++ 6 files changed, 42 insertions(+), 23 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index aaab4e6c9ef..da31aa6a541 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -56,6 +56,7 @@ func NewSelectorSpreadPriority( return selectorSpread.CalculateSpreadPriority } +// Returns selectors of services, RCs and RSs matching the given pod. func getSelectors(pod *api.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister) []labels.Selector { selectors := make([]labels.Selector, 0, 3) if services, err := sl.GetPodServices(pod); err == nil { @@ -83,10 +84,10 @@ func (s *SelectorSpread) getSelectors(pod *api.Pod) []labels.Selector { } // CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. -// When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors. +// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods. // i.e. it pushes the scheduler towards a node where there's the smallest number of -// pods which match the same service selectors or RC selectors as the pod being scheduled. +// pods which match the same service, RC or RS selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { selectors := s.getSelectors(pod) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 7bafbcd3502..bb39af4e4a2 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -67,7 +67,7 @@ func init() { // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // Register the priority function so that its available // but do not include it as part of the default priorities - factory.RegisterPriorityFunction("EqualPriority", scheduler.EqualPriority, 1) + factory.RegisterPriorityFunction2("EqualPriority", scheduler.EqualPriorityMap, nil, 1) // ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing // the number of pods (belonging to the same service) on the same node. diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index 0dc897a4ea1..88b10249674 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -19,12 +19,13 @@ package scheduler import ( "fmt" "testing" + "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" - schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) @@ -170,7 +171,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }{ { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{truePredicateExtender}, @@ -185,7 +186,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{truePredicateExtender}, @@ -200,7 +201,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{truePredicateExtender}, @@ -215,7 +216,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{machine2PredicateExtender}, @@ -230,7 +231,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{truePredicateExtender}, @@ -244,7 +245,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, extenders: []FakeExtender{ { predicates: []fitPredicate{truePredicateExtender}, @@ -282,8 +283,15 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + for _, pod := range test.pods { + cache.AddPod(pod) + } + for _, name := range test.nodes { + cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) + } scheduler := NewGenericScheduler( - schedulertesting.PodsToCache(test.pods), test.predicates, algorithm.EmptyMetadataProducer, + cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, extenders) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 24b45eb5bfb..fe8172f50fe 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -243,7 +243,15 @@ func PrioritizeNodes( // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { - return EqualPriority(pod, nodeNameToInfo, nodes) + result := make(schedulerapi.HostPriorityList, 0, len(nodes)) + for i := range nodes { + hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name]) + if err != nil { + return nil, err + } + result = append(result, hostPriority) + } + return result, nil } var ( @@ -355,15 +363,15 @@ func PrioritizeNodes( } // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes -func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - result := make(schedulerapi.HostPriorityList, len(nodes)) - for _, node := range nodes { - result = append(result, schedulerapi.HostPriority{ - Host: node.Name, - Score: 1, - }) +func EqualPriorityMap(_ *api.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } - return result, nil + return schedulerapi.HostPriority{ + Host: node.Name, + Score: 1, + }, nil } func NewGenericScheduler( diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index c8bf64e57bc..258de553b02 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -189,7 +189,7 @@ func TestGenericScheduler(t *testing.T) { }{ { predicates: map[string]algorithm.FitPredicate{"false": falsePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, nodes: []string{"machine1", "machine2"}, expectsErr: true, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, @@ -203,7 +203,7 @@ func TestGenericScheduler(t *testing.T) { }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, nodes: []string{"machine1", "machine2"}, expectedHosts: sets.NewString("machine1", "machine2"), name: "test 2", @@ -212,7 +212,7 @@ func TestGenericScheduler(t *testing.T) { { // Fits on a machine where the pod ID matches the machine name predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate}, - prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, nodes: []string{"machine1", "machine2"}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, expectedHosts: sets.NewString("machine2"), diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 3aae5a211a4..86e5eb3a36e 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -185,6 +185,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { scache := schedulercache.New(100*time.Millisecond, stop) pod := podWithPort("pod.Name", "", 8080) node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}} + scache.AddNode(&node) nodeLister := algorithm.FakeNodeLister([]*api.Node{&node}) predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node) @@ -242,6 +243,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { scache := schedulercache.New(10*time.Minute, stop) firstPod := podWithPort("pod.Name", "", 8080) node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}} + scache.AddNode(&node) nodeLister := algorithm.FakeNodeLister([]*api.Node{&node}) predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node)