diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index 59335a6af40..112387e6d3c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -24,7 +24,6 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) @@ -115,16 +114,15 @@ func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) sc // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes // based on the minimum of the average of the fraction of requested to capacity. // Details: cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2 -func LeastRequestedPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func LeastRequestedPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return schedulerapi.HostPriorityList{}, err } - podsToMachines, err := predicates.MapPodsToMachines(podLister) list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateResourceOccupancy(pod, node, podsToMachines[node.Name])) + list = append(list, calculateResourceOccupancy(pod, node, machinesToPods[node.Name])) } return list, nil } @@ -145,7 +143,7 @@ func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunctio // CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value. // If presence is true, prioritizes nodes that have the specified label, regardless of value. // If presence is false, prioritizes nodes that do not have the specified label. -func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var score int nodes, err := nodeLister.List() if err != nil { @@ -178,16 +176,15 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podListe // close the two metrics are to each other. // Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by: // "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization" -func BalancedResourceAllocation(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func BalancedResourceAllocation(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return schedulerapi.HostPriorityList{}, err } - podsToMachines, err := predicates.MapPodsToMachines(podLister) list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateBalancedResourceAllocation(pod, node, podsToMachines[node.Name])) + list = append(list, calculateBalancedResourceAllocation(pod, node, machinesToPods[node.Name])) } return list, nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 823e516dcf0..3d6207e5a5b 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) @@ -127,8 +128,13 @@ func TestZeroRequest(t *testing.T) { const expectedPriority int = 25 for _, test := range tests { + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } list, err := scheduler.PrioritizeNodes( test.pod, + m2p, algorithm.FakePodLister(test.pods), // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want @@ -377,7 +383,11 @@ func TestLeastRequested(t *testing.T) { } for _, test := range tests { - list, err := LeastRequestedPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + list, err := LeastRequestedPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -471,7 +481,7 @@ func TestNewNodeLabelPriority(t *testing.T) { label: test.label, presence: test.presence, } - list, err := prioritizer.CalculateNodeLabelPriority(nil, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string][]*api.Pod{}, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -709,7 +719,11 @@ func TestBalancedResourceAllocation(t *testing.T) { } for _, test := range tests { - list, err := BalancedResourceAllocation(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + list, err := BalancedResourceAllocation(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index f1202e50055..f348acdb95d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -73,7 +73,7 @@ func getZoneKey(node *api.Node) string { // i.e. it pushes the scheduler towards a node where there's the smallest number of // pods which match the same service selectors or RC selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. -func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsPods []*api.Pod selectors := make([]labels.Selector, 0) @@ -213,7 +213,7 @@ func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label // CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service // on machines with the same value for a particular label. // The label to be considered is provided to the struct (ServiceAntiAffinity). -func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod services, err := s.serviceLister.GetPodServices(pod) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index a9ee18c87a2..f50c3e79783 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) @@ -218,8 +219,12 @@ func TestSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} - list, err := selectorSpread.CalculateSpreadPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeNodeList(test.nodes))) + list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -418,7 +423,11 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { for _, test := range tests { selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} - list, err := selectorSpread.CalculateSpreadPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -588,8 +597,12 @@ func TestZoneSpreadPriority(t *testing.T) { } for _, test := range tests { + m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } zoneSpread := ServiceAntiAffinity{serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"} - list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes))) + list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index bca143c79d7..a0df5b58c86 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -24,7 +24,7 @@ import ( // FitPredicate is a function that indicates if a pod fits into an existing node. type FitPredicate func(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) -type PriorityFunction func(pod *api.Pod, podLister PodLister, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) +type PriorityFunction func(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister PodLister, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) type PriorityConfig struct { Function PriorityFunction diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index dd182395a84..2d052ae96c8 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -88,7 +88,7 @@ func machine2PrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulera return &result, nil } -func machine2Prioritizer(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func machine2Prioritizer(_ *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return []schedulerapi.HostPriority{}, err diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index fa89390c692..057d16b9627 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -124,11 +124,11 @@ func PredicateTwo(pod *api.Pod, existingPods []*api.Pod, node string) (bool, err return true, nil } -func PriorityOne(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func PriorityOne(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { return []schedulerapi.HostPriority{}, nil } -func PriorityTwo(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func PriorityTwo(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { return []schedulerapi.HostPriority{}, nil } diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index f348d4e5ff2..ba5654afe82 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -71,12 +71,19 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe return "", ErrNoNodesAvailable } - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.pods, g.predicates, nodes, g.extenders) + // TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute. + // But at least we're now only doing it in one place + machinesToPods, err := predicates.MapPodsToMachines(g.pods) if err != nil { return "", err } - priorityList, err := PrioritizeNodes(pod, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, machinesToPods, g.predicates, nodes, g.extenders) + if err != nil { + return "", err + } + + priorityList, err := PrioritizeNodes(pod, machinesToPods, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -108,13 +115,10 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { +func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { filtered := []api.Node{} - machineToPods, err := predicates.MapPodsToMachines(podLister) failedPredicateMap := FailedPredicateMap{} - if err != nil { - return api.NodeList{}, FailedPredicateMap{}, err - } + for _, node := range nodes.Items { fits := true for name, predicate := range predicateFuncs { @@ -161,13 +165,13 @@ func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFunc // Each priority function can also have its own weight // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes -func PrioritizeNodes(pod *api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { +func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { result := schedulerapi.HostPriorityList{} // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { - return EqualPriority(pod, podLister, nodeLister) + return EqualPriority(pod, machinesToPods, podLister, nodeLister) } combinedScores := map[string]int{} @@ -178,7 +182,7 @@ func PrioritizeNodes(pod *api.Pod, podLister algorithm.PodLister, priorityConfig continue } priorityFunc := priorityConfig.Function - prioritizedList, err := priorityFunc(pod, podLister, nodeLister) + prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister) if err != nil { return schedulerapi.HostPriorityList{}, err } @@ -224,7 +228,7 @@ func getBestHosts(list schedulerapi.HostPriorityList) []string { } // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes -func EqualPriority(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func EqualPriority(_ *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { glog.Errorf("Failed to list nodes: %v", err) diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 68e1683d9fb..7b651f0d26b 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -45,7 +45,7 @@ func hasNoPodsPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (boo return len(existingPods) == 0, nil } -func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func numericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() result := []schedulerapi.HostPriority{} @@ -65,11 +65,11 @@ func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister alg return result, nil } -func reverseNumericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func reverseNumericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var maxScore float64 minScore := math.MaxFloat64 reverseResult := []schedulerapi.HostPriority{} - result, err := numericPriority(pod, podLister, nodeLister) + result, err := numericPriority(pod, machineToPods, podLister, nodeLister) if err != nil { return nil, err } @@ -308,7 +308,12 @@ func TestGenericScheduler(t *testing.T) { func TestFindFitAllError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} - _, predicateMap, err := findNodesThatFit(&api.Pod{}, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes), nil) + machineToPods := map[string][]*api.Pod{ + "3": {}, + "2": {}, + "1": {}, + } + _, predicateMap, err := findNodesThatFit(&api.Pod{}, machineToPods, predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -333,7 +338,12 @@ func TestFindFitSomeError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "1"}} - _, predicateMap, err := findNodesThatFit(pod, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes), nil) + machineToPods := map[string][]*api.Pod{ + "3": {}, + "2": {}, + "1": {pod}, + } + _, predicateMap, err := findNodesThatFit(pod, machineToPods, predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err)