Migrate EqualPriority to MapReduce-like framework.

This commit is contained in:
Wojciech Tyczynski 2016-09-30 15:14:29 +02:00
parent daac9a1869
commit f8632e2203
6 changed files with 42 additions and 23 deletions

View File

@ -56,6 +56,7 @@ func NewSelectorSpreadPriority(
return selectorSpread.CalculateSpreadPriority
}
// Returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *api.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister) []labels.Selector {
selectors := make([]labels.Selector, 0, 3)
if services, err := sl.GetPodServices(pod); err == nil {
@ -83,10 +84,10 @@ func (s *SelectorSpread) getSelectors(pod *api.Pod) []labels.Selector {
}
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors.
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service selectors or RC selectors as the pod being scheduled.
// pods which match the same service, RC or RS selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
selectors := s.getSelectors(pod)

View File

@ -67,7 +67,7 @@ func init() {
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available
// but do not include it as part of the default priorities
factory.RegisterPriorityFunction("EqualPriority", scheduler.EqualPriority, 1)
factory.RegisterPriorityFunction2("EqualPriority", scheduler.EqualPriorityMap, nil, 1)
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
// the number of pods (belonging to the same service) on the same node.

View File

@ -19,12 +19,13 @@ package scheduler
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
@ -170,7 +171,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}{
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -185,7 +186,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -200,7 +201,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -215,7 +216,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{machine2PredicateExtender},
@ -230,7 +231,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -244,7 +245,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
extenders: []FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
@ -282,8 +283,15 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods {
cache.AddPod(pod)
}
for _, name := range test.nodes {
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
}
scheduler := NewGenericScheduler(
schedulertesting.PodsToCache(test.pods), test.predicates, algorithm.EmptyMetadataProducer,
cache, test.predicates, algorithm.EmptyMetadataProducer,
test.prioritizers, extenders)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@ -243,7 +243,15 @@ func PrioritizeNodes(
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
return EqualPriority(pod, nodeNameToInfo, nodes)
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
result = append(result, hostPriority)
}
return result, nil
}
var (
@ -355,15 +363,15 @@ func PrioritizeNodes(
}
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, len(nodes))
for _, node := range nodes {
result = append(result, schedulerapi.HostPriority{
Host: node.Name,
Score: 1,
})
func EqualPriorityMap(_ *api.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
return result, nil
return schedulerapi.HostPriority{
Host: node.Name,
Score: 1,
}, nil
}
func NewGenericScheduler(

View File

@ -189,7 +189,7 @@ func TestGenericScheduler(t *testing.T) {
}{
{
predicates: map[string]algorithm.FitPredicate{"false": falsePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
expectsErr: true,
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
@ -203,7 +203,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
expectedHosts: sets.NewString("machine1", "machine2"),
name: "test 2",
@ -212,7 +212,7 @@ func TestGenericScheduler(t *testing.T) {
{
// Fits on a machine where the pod ID matches the machine name
predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate},
prioritizers: []algorithm.PriorityConfig{{Function: EqualPriority, Weight: 1}},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}},
expectedHosts: sets.NewString("machine2"),

View File

@ -185,6 +185,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
scache := schedulercache.New(100*time.Millisecond, stop)
pod := podWithPort("pod.Name", "", 8080)
node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
scache.AddNode(&node)
nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node)
@ -242,6 +243,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
scache := schedulercache.New(10*time.Minute, stop)
firstPod := podWithPort("pod.Name", "", 8080)
node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
scache.AddNode(&node)
nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node)