diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index ab5c3a91ae7..c0d52815208 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -24,6 +24,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/unversioned" @@ -122,9 +123,9 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // - AWS EBS forbids any two pods mounting the same volume ID // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image. // TODO: migrate this into some per-volume specific code? -func NoDiskConflict(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func NoDiskConflict(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { for _, v := range pod.Spec.Volumes { - for _, ev := range existingPods { + for _, ev := range nodeInfo.Pods() { if isVolumeConflict(v, ev) { return false, nil } @@ -198,7 +199,7 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace return nil } -func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { newVolumes := make(map[string]bool) if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil { return false, err @@ -211,7 +212,7 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Po // count unique volumes existingVolumes := make(map[string]bool) - for _, existingPod := range existingPods { + for _, existingPod := range nodeInfo.Pods() { if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil { return false, err } @@ -297,13 +298,13 @@ func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcI return c.predicate } -func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) { - node, err := c.nodeInfo.GetNodeInfo(nodeID) +func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node, err := c.nodeInfo.GetNodeInfo(nodeName) if err != nil { return false, err } if node == nil { - return false, fmt.Errorf("node not found: %q", nodeID) + return false, fmt.Errorf("node not found: %q", nodeName) } nodeConstraints := make(map[string]string) @@ -360,7 +361,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nod } nodeV, _ := nodeConstraints[k] if v != nodeV { - glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeID, pvName, k) + glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k) return false, nil } } @@ -389,18 +390,6 @@ func getResourceRequest(pod *api.Pod) resourceRequest { return result } -func getTotalResourceRequest(pods []*api.Pod) resourceRequest { - result := resourceRequest{} - for _, pod := range pods { - for _, container := range pod.Spec.Containers { - requests := container.Resources.Requests - result.memory += requests.Memory().Value() - result.milliCPU += requests.Cpu().MilliValue() - } - } - return result -} - func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) { totalMilliCPU := allocatable.Cpu().MilliValue() totalMemory := allocatable.Memory().Value() @@ -433,16 +422,17 @@ func podName(pod *api.Pod) string { } // PodFitsResources calculates fit based on requested, rather than used resources -func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { - info, err := r.info.GetNodeInfo(node) +func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + info, err := r.info.GetNodeInfo(nodeName) if err != nil { return false, err } allocatable := info.Status.Allocatable - if int64(len(existingPods))+1 > allocatable.Pods().Value() { - return false, newInsufficientResourceError(podCountResourceName, 1, - int64(len(existingPods)), allocatable.Pods().Value()) + allowedPodNumber := allocatable.Pods().Value() + if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber { + return false, + newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber) } podRequest := getResourceRequest(pod) @@ -450,17 +440,18 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, no return true, nil } - pods := append(existingPods, pod) - _, exceedingCPU, exceedingMemory := CheckPodsExceedingFreeResources(pods, allocatable) - if len(exceedingCPU) > 0 { - return false, newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, - getTotalResourceRequest(existingPods).milliCPU, allocatable.Cpu().MilliValue()) + totalMilliCPU := allocatable.Cpu().MilliValue() + totalMemory := allocatable.Memory().Value() + if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU { + return false, + newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU) } - if len(exceedingMemory) > 0 { - return false, newInsufficientResourceError(memoryResoureceName, podRequest.memory, - getTotalResourceRequest(existingPods).memory, allocatable.Memory().Value()) + if totalMemory < podRequest.memory+nodeInfo.RequestedResource().Memory { + return false, + newInsufficientResourceError(memoryResoureceName, podRequest.memory, nodeInfo.RequestedResource().Memory, totalMemory) } - glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", podName(pod), node, len(pods)-1, allocatable.Pods().Value()) + glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", + podName(pod), nodeName, len(nodeInfo.Pods()), allowedPodNumber) return true, nil } @@ -548,19 +539,19 @@ type NodeSelector struct { info NodeInfo } -func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) { - node, err := n.info.GetNodeInfo(nodeID) +func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node, err := n.info.GetNodeInfo(nodeName) if err != nil { return false, err } return PodMatchesNodeLabels(pod, node), nil } -func PodFitsHost(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { if len(pod.Spec.NodeName) == 0 { return true, nil } - return pod.Spec.NodeName == node, nil + return pod.Spec.NodeName == nodeName, nil } type NodeLabelChecker struct { @@ -590,9 +581,9 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori // Alternately, eliminating nodes that have a certain label, regardless of value, is also useful // A node may have a label with "retiring" as key and the date as the value // and it may be desirable to avoid scheduling new pods on this node -func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) { +func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { var exists bool - node, err := n.info.GetNodeInfo(nodeID) + node, err := n.info.GetNodeInfo(nodeName) if err != nil { return false, err } @@ -632,7 +623,7 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al // - L is listed in the ServiceAffinity object that is passed into the function // - the pod does not have any NodeSelector for L // - some other pod from the same service is already scheduled onto a node that has value V for label L -func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) { +func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { var affinitySelector labels.Selector // check if the pod being scheduled has the affinity labels specified in its NodeSelector @@ -692,7 +683,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api affinitySelector = labels.Set(affinityLabels).AsSelector() } - node, err := s.nodeInfo.GetNodeInfo(nodeID) + node, err := s.nodeInfo.GetNodeInfo(nodeName) if err != nil { return false, err } @@ -701,12 +692,12 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api return affinitySelector.Matches(labels.Set(node.Labels)), nil } -func PodFitsHostPorts(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { wantPorts := getUsedPorts(pod) if len(wantPorts) == 0 { return true, nil } - existingPorts := getUsedPorts(existingPods...) + existingPorts := getUsedPorts(nodeInfo.Pods()...) for wport := range wantPorts { if wport == 0 { continue @@ -730,22 +721,6 @@ func getUsedPorts(pods ...*api.Pod) map[int]bool { return ports } -// MapPodsToMachines obtains a list of pods and pivots that list into a map where the keys are host names -// and the values are the list of pods running on that host. -func MapPodsToMachines(lister algorithm.PodLister) (map[string][]*api.Pod, error) { - machineToPods := map[string][]*api.Pod{} - // TODO: perform more targeted query... - pods, err := lister.List(labels.Everything()) - if err != nil { - return map[string][]*api.Pod{}, err - } - for _, scheduledPod := range pods { - host := scheduledPod.Spec.NodeName - machineToPods[host] = append(machineToPods[host], scheduledPod) - } - return machineToPods, nil -} - // search two arrays and return true if they have at least one common element; return false otherwise func haveSame(a1, a2 []string) bool { for _, val1 := range a1 { diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 34d3fdeadca..4ac7191f873 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type FakeNodeInfo api.Node @@ -105,53 +106,48 @@ func newResourcePod(usage ...resourceRequest) *api.Pod { func TestPodFitsResources(t *testing.T) { enoughPodsTests := []struct { - pod *api.Pod - existingPods []*api.Pod - fits bool - test string - wErr error + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + fits bool + test string + wErr error }{ { pod: &api.Pod{}, - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 10, memory: 20}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 10, memory: 20})), fits: true, test: "no resources requested always fits", wErr: nil, }, { pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 10, memory: 20}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 10, memory: 20})), fits: false, test: "too many resources fails", wErr: newInsufficientResourceError(cpuResourceName, 1, 10, 10), }, { pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 5, memory: 5}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 5})), fits: true, test: "both resources fit", wErr: nil, }, { pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 2}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 5, memory: 19}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 19})), fits: false, test: "one resources fits", wErr: newInsufficientResourceError(memoryResoureceName, 2, 19, 20), }, { pod: newResourcePod(resourceRequest{milliCPU: 5, memory: 1}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 5, memory: 19}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 19})), fits: true, test: "equal edge case", wErr: nil, @@ -162,7 +158,7 @@ func TestPodFitsResources(t *testing.T) { node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}} fit := ResourceFit{FakeNodeInfo(node)} - fits, err := fit.PodFitsResources(test.pod, test.existingPods, "machine") + fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -172,35 +168,32 @@ func TestPodFitsResources(t *testing.T) { } notEnoughPodsTests := []struct { - pod *api.Pod - existingPods []*api.Pod - fits bool - test string - wErr error + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + fits bool + test string + wErr error }{ { pod: &api.Pod{}, - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 10, memory: 20}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 10, memory: 20})), fits: false, test: "even without specified resources predicate fails when there's no space for additional pod", wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1), }, { pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 5, memory: 5}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 5})), fits: false, test: "even if both resources fit predicate fails when there's no space for additional pod", wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1), }, { pod: newResourcePod(resourceRequest{milliCPU: 5, memory: 1}), - existingPods: []*api.Pod{ - newResourcePod(resourceRequest{milliCPU: 5, memory: 19}), - }, + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 19})), fits: false, test: "even for equal edge case predicate fails when there's no space for additional pod", wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1), @@ -210,7 +203,7 @@ func TestPodFitsResources(t *testing.T) { node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}} fit := ResourceFit{FakeNodeInfo(node)} - fits, err := fit.PodFitsResources(test.pod, test.existingPods, "machine") + fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -256,7 +249,7 @@ func TestPodFitsHost(t *testing.T) { } for _, test := range tests { - result, err := PodFitsHost(test.pod, []*api.Pod{}, test.node) + result, err := PodFitsHost(test.pod, test.node, schedulercache.NewNodeInfo()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -285,52 +278,48 @@ func newPod(host string, hostPorts ...int) *api.Pod { func TestPodFitsHostPorts(t *testing.T) { tests := []struct { - pod *api.Pod - existingPods []*api.Pod - fits bool - test string + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + fits bool + test string }{ { - pod: &api.Pod{}, - existingPods: []*api.Pod{}, - fits: true, - test: "nothing running", + pod: &api.Pod{}, + nodeInfo: schedulercache.NewNodeInfo(), + fits: true, + test: "nothing running", }, { pod: newPod("m1", 8080), - existingPods: []*api.Pod{ - newPod("m1", 9090), - }, + nodeInfo: schedulercache.NewNodeInfo( + newPod("m1", 9090)), fits: true, test: "other port", }, { pod: newPod("m1", 8080), - existingPods: []*api.Pod{ - newPod("m1", 8080), - }, + nodeInfo: schedulercache.NewNodeInfo( + newPod("m1", 8080)), fits: false, test: "same port", }, { pod: newPod("m1", 8000, 8080), - existingPods: []*api.Pod{ - newPod("m1", 8080), - }, + nodeInfo: schedulercache.NewNodeInfo( + newPod("m1", 8080)), fits: false, test: "second port", }, { pod: newPod("m1", 8000, 8080), - existingPods: []*api.Pod{ - newPod("m1", 8001, 8080), - }, + nodeInfo: schedulercache.NewNodeInfo( + newPod("m1", 8001, 8080)), fits: false, test: "second port", }, } for _, test := range tests { - fits, err := PodFitsHostPorts(test.pod, test.existingPods, "machine") + fits, err := PodFitsHostPorts(test.pod, "machine", test.nodeInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -400,27 +389,27 @@ func TestDiskConflicts(t *testing.T) { }, } tests := []struct { - pod *api.Pod - existingPods []*api.Pod - isOk bool - test string + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + isOk bool + test string }{ - {&api.Pod{}, []*api.Pod{}, true, "nothing"}, - {&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"}, - {&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"}, - {&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"}, + {&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"}, + {&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"}, + {&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"}, + {&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"}, } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.existingPods, "machine") + ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) if err != nil { t.Fatalf("unexpected error: %v", err) } if test.isOk && !ok { - t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) } if !test.isOk && ok { - t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test) } } } @@ -449,27 +438,27 @@ func TestAWSDiskConflicts(t *testing.T) { }, } tests := []struct { - pod *api.Pod - existingPods []*api.Pod - isOk bool - test string + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + isOk bool + test string }{ - {&api.Pod{}, []*api.Pod{}, true, "nothing"}, - {&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"}, - {&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"}, - {&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"}, + {&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"}, + {&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"}, + {&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"}, + {&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"}, } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.existingPods, "machine") + ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) if err != nil { t.Fatalf("unexpected error: %v", err) } if test.isOk && !ok { - t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) } if !test.isOk && ok { - t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test) } } } @@ -504,27 +493,27 @@ func TestRBDDiskConflicts(t *testing.T) { }, } tests := []struct { - pod *api.Pod - existingPods []*api.Pod - isOk bool - test string + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + isOk bool + test string }{ - {&api.Pod{}, []*api.Pod{}, true, "nothing"}, - {&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"}, - {&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"}, - {&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"}, + {&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"}, + {&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"}, + {&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"}, + {&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"}, } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.existingPods, "machine") + ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) if err != nil { t.Fatalf("unexpected error: %v", err) } if test.isOk && !ok { - t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) } if !test.isOk && ok { - t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test) + t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test) } } } @@ -989,7 +978,7 @@ func TestPodFitsSelector(t *testing.T) { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}} fit := NodeSelector{FakeNodeInfo(node)} - fits, err := fit.PodSelectorMatches(test.pod, []*api.Pod{}, "machine") + fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1002,12 +991,11 @@ func TestPodFitsSelector(t *testing.T) { func TestNodeLabelPresence(t *testing.T) { label := map[string]string{"foo": "bar", "bar": "foo"} tests := []struct { - pod *api.Pod - existingPods []*api.Pod - labels []string - presence bool - fits bool - test string + pod *api.Pod + labels []string + presence bool + fits bool + test string }{ { labels: []string{"baz"}, @@ -1049,7 +1037,7 @@ func TestNodeLabelPresence(t *testing.T) { for _, test := range tests { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} - fits, err := labelChecker.CheckNodeLabelPresence(test.pod, test.existingPods, "machine") + fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1189,7 +1177,7 @@ func TestServiceAffinity(t *testing.T) { for _, test := range tests { nodes := []api.Node{node1, node2, node3, node4, node5} serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} - fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []*api.Pod{}, test.node) + fits, err := serviceAffinity.CheckServiceAffinity(test.pod, test.node, schedulercache.NewNodeInfo()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1409,7 +1397,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) { for _, test := range tests { pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) - fits, err := pred(test.newPod, test.existingPods, "some-node") + fits, err := pred(test.newPod, "some-node", schedulercache.NewNodeInfo(test.existingPods...)) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go index ced9d5a16c6..671cc98788c 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type NodeAffinity struct { @@ -40,7 +41,7 @@ func NewNodeAffinityPriority(nodeLister algorithm.NodeLister) algorithm.Priority // it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms // the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher // score the node gets. -func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var maxCount int counts := map[string]int{} diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index 02343a8e8e7..12a7fb521a3 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func TestNodeAffinityPriority(t *testing.T) { @@ -156,7 +157,7 @@ func TestNodeAffinityPriority(t *testing.T) { for _, test := range tests { nodeAffinity := NodeAffinity{nodeLister: algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})} - list, err := nodeAffinity.CalculateNodeAffinityPriority(test.pod, nil, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + list, err := nodeAffinity.CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index 73bcfd3dfb9..2995c00b559 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // the unused capacity is calculated on a scale of 0-10 @@ -114,7 +115,7 @@ func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) sc // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes // based on the minimum of the average of the fraction of requested to capacity. // Details: cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2 -func LeastRequestedPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return schedulerapi.HostPriorityList{}, err @@ -122,7 +123,7 @@ func LeastRequestedPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateResourceOccupancy(pod, node, machinesToPods[node.Name])) + list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods())) } return list, nil } @@ -143,7 +144,7 @@ func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunctio // CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value. // If presence is true, prioritizes nodes that have the specified label, regardless of value. // If presence is false, prioritizes nodes that do not have the specified label. -func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var score int nodes, err := nodeLister.List() if err != nil { @@ -182,7 +183,7 @@ const ( // based on the total size of those images. // - If none of the images are present, this node will be given the lowest priority. // - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority. -func ImageLocalityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { sumSizeMap := make(map[string]int64) nodes, err := nodeLister.List() @@ -248,7 +249,7 @@ func calculateScoreFromSize(sumSize int64) int { // close the two metrics are to each other. // Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by: // "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization" -func BalancedResourceAllocation(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return schedulerapi.HostPriorityList{}, err @@ -256,7 +257,7 @@ func BalancedResourceAllocation(pod *api.Pod, machinesToPods map[string][]*api.P list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { - list = append(list, calculateBalancedResourceAllocation(pod, node, machinesToPods[node.Name])) + list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods())) } return list, nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index de64a682f27..08526ee2dd0 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -26,8 +26,8 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func makeNode(node string, milliCPU, memory int64) api.Node { @@ -132,18 +132,15 @@ func TestZeroRequest(t *testing.T) { const expectedPriority int = 25 for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) list, err := scheduler.PrioritizeNodes( test.pod, - m2p, + nodeNameToInfo, algorithm.FakePodLister(test.pods), // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. - []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, + []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err) @@ -387,11 +384,8 @@ func TestLeastRequested(t *testing.T) { } for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - list, err := LeastRequestedPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -485,7 +479,7 @@ func TestNewNodeLabelPriority(t *testing.T) { label: test.label, presence: test.presence, } - list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string][]*api.Pod{}, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string]*schedulercache.NodeInfo{}, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -723,11 +717,8 @@ func TestBalancedResourceAllocation(t *testing.T) { } for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - list, err := BalancedResourceAllocation(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -870,11 +861,8 @@ func TestImageLocalityPriority(t *testing.T) { } for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - list, err := ImageLocalityPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index f348acdb95d..0facd305585 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // The maximum priority value to give to a node @@ -34,12 +35,14 @@ const maxPriority = 10 const zoneWeighting = 2.0 / 3.0 type SelectorSpread struct { + podLister algorithm.PodLister serviceLister algorithm.ServiceLister controllerLister algorithm.ControllerLister } -func NewSelectorSpreadPriority(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction { +func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction { selectorSpread := &SelectorSpread{ + podLister: podLister, serviceLister: serviceLister, controllerLister: controllerLister, } @@ -73,7 +76,7 @@ func getZoneKey(node *api.Node) string { // i.e. it pushes the scheduler towards a node where there's the smallest number of // pods which match the same service selectors or RC selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. -func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsPods []*api.Pod selectors := make([]labels.Selector, 0) @@ -91,7 +94,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods ma } if len(selectors) > 0 { - pods, err := podLister.List(labels.Everything()) + pods, err := s.podLister.List(labels.Everything()) if err != nil { return nil, err } @@ -198,12 +201,14 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods ma } type ServiceAntiAffinity struct { + podLister algorithm.PodLister serviceLister algorithm.ServiceLister label string } -func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction { +func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction { antiAffinity := &ServiceAntiAffinity{ + podLister: podLister, serviceLister: serviceLister, label: label, } @@ -213,7 +218,7 @@ func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label // CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service // on machines with the same value for a particular label. // The label to be considered is provided to the struct (ServiceAntiAffinity). -func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod services, err := s.serviceLister.GetPodServices(pod) @@ -221,7 +226,7 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, machin // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) - pods, err := podLister.List(selector) + pods, err := s.podLister.List(selector) if err != nil { return nil, err } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index f50c3e79783..9fad1eb9177 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -24,8 +24,8 @@ import ( "k8s.io/kubernetes/pkg/api" wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func TestSelectorSpreadPriority(t *testing.T) { @@ -219,12 +219,9 @@ func TestSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} - list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeNodeList(test.nodes))) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} + list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -422,12 +419,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { } for _, test := range tests { - selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} + list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -597,12 +591,9 @@ func TestZoneSpreadPriority(t *testing.T) { } for _, test := range tests { - m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods)) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - zoneSpread := ServiceAntiAffinity{serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"} - list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes))) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) + zoneSpread := ServiceAntiAffinity{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"} + list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index a0df5b58c86..ffce2c71583 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -19,12 +19,13 @@ package algorithm import ( "k8s.io/kubernetes/pkg/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // FitPredicate is a function that indicates if a pod fits into an existing node. -type FitPredicate func(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) +type FitPredicate func(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) -type PriorityFunction func(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister PodLister, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) +type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) type PriorityConfig struct { Function PriorityFunction diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 2e295e93c45..9f1b92cdcca 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -68,7 +68,7 @@ func init() { "ServiceSpreadingPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { - return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}) + return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}) }, Weight: 1, }, @@ -141,7 +141,7 @@ func defaultPriorities() sets.String { "SelectorSpreadPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { - return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister) + return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister) }, Weight: 1, }, diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index 2d052ae96c8..ad869c9c473 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) @@ -88,7 +89,7 @@ func machine2PrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulera return &result, nil } -func machine2Prioritizer(_ *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func machine2Prioritizer(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { return []schedulerapi.HostPriority{}, err diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index fb14a7929de..c97a79cc443 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func TestCreate(t *testing.T) { @@ -117,19 +118,19 @@ func TestCreateFromEmptyConfig(t *testing.T) { factory.CreateFromConfig(policy) } -func PredicateOne(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func PredicateOne(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { return true, nil } -func PredicateTwo(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func PredicateTwo(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { return true, nil } -func PriorityOne(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func PriorityOne(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { return []schedulerapi.HostPriority{}, nil } -func PriorityTwo(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func PriorityTwo(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { return []schedulerapi.HostPriority{}, nil } diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 1d0d80caa4c..06d3755ee44 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -168,6 +168,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { pcf = &PriorityConfigFactory{ Function: func(args PluginFactoryArgs) algorithm.PriorityFunction { return priorities.NewServiceAntiAffinityPriority( + args.PodLister, args.ServiceLister, policy.Argument.ServiceAntiAffinity.Label, ) diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index fc42b7d99ff..e1c8a7df918 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -25,11 +25,13 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) type FailedPredicateMap map[string]sets.String @@ -75,12 +77,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe // TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute. // But at least we're now only doing it in one place - machinesToPods, err := predicates.MapPodsToMachines(g.pods) + pods, err := g.pods.List(labels.Everything()) if err != nil { return "", err } - - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, machinesToPods, g.predicates, nodes, g.extenders) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) if err != nil { return "", err } @@ -92,7 +94,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } } - priorityList, err := PrioritizeNodes(pod, machinesToPods, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) + priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -129,14 +131,14 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { +func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { filtered := []api.Node{} failedPredicateMap := FailedPredicateMap{} for _, node := range nodes.Items { fits := true for name, predicate := range predicateFuncs { - fit, err := predicate(pod, machineToPods[node.Name], node.Name) + fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name]) if err != nil { switch e := err.(type) { case *predicates.InsufficientResourceError: @@ -186,13 +188,13 @@ func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predica // Each priority function can also have its own weight // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes -func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { +func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { result := schedulerapi.HostPriorityList{} // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { - return EqualPriority(pod, machinesToPods, podLister, nodeLister) + return EqualPriority(pod, nodeNameToInfo, nodeLister) } var ( @@ -213,7 +215,7 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList defer wg.Done() weight := config.Weight priorityFunc := config.Function - prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister) + prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodeLister) if err != nil { mu.Lock() errs = append(errs, err) @@ -269,7 +271,7 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList } // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes -func EqualPriority(_ *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { glog.Errorf("Failed to list nodes: %v", err) diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index bd8360b26e4..de63abf8647 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -27,25 +27,26 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -func falsePredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { return false, nil } -func truePredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { +func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { return true, nil } -func matchesPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { - return pod.Name == node, nil +func matchesPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + return pod.Name == nodeName, nil } -func hasNoPodsPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { - return len(existingPods) == 0, nil +func hasNoPodsPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + return len(nodeInfo.Pods()) == 0, nil } -func numericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() result := []schedulerapi.HostPriority{} @@ -65,11 +66,11 @@ func numericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podListe return result, nil } -func reverseNumericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { +func reverseNumericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var maxScore float64 minScore := math.MaxFloat64 reverseResult := []schedulerapi.HostPriority{} - result, err := numericPriority(pod, machineToPods, podLister, nodeLister) + result, err := numericPriority(pod, nodeNameToInfo, nodeLister) if err != nil { return nil, err } @@ -275,12 +276,12 @@ func TestGenericScheduler(t *testing.T) { func TestFindFitAllError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} - machineToPods := map[string][]*api.Pod{ - "3": {}, - "2": {}, - "1": {}, + nodeNameToInfo := map[string]*schedulercache.NodeInfo{ + "3": schedulercache.NewNodeInfo(), + "2": schedulercache.NewNodeInfo(), + "1": schedulercache.NewNodeInfo(), } - _, predicateMap, err := findNodesThatFit(&api.Pod{}, machineToPods, predicates, makeNodeList(nodes), nil) + _, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -305,12 +306,12 @@ func TestFindFitSomeError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "1"}} - machineToPods := map[string][]*api.Pod{ - "3": {}, - "2": {}, - "1": {pod}, + nodeNameToInfo := map[string]*schedulercache.NodeInfo{ + "3": schedulercache.NewNodeInfo(), + "2": schedulercache.NewNodeInfo(), + "1": schedulercache.NewNodeInfo(pod), } - _, predicateMap, err := findNodesThatFit(pod, machineToPods, predicates, makeNodeList(nodes), nil) + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go new file mode 100644 index 00000000000..746386e0b2d --- /dev/null +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -0,0 +1,96 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" +) + +var emptyResource = Resource{} + +// NodeInfo is node level aggregated information. +type NodeInfo struct { + // Total requested resource of all pods on this node. + // It includes assumed pods which scheduler sends binding to apiserver but + // didn't get it as scheduled yet. + requestedResource *Resource + pods []*api.Pod +} + +// Resource is a collection of compute resource. +type Resource struct { + MilliCPU int64 + Memory int64 +} + +// NewNodeInfo returns a ready to use empty NodeInfo object. +// If any pods are given in arguments, their information will be aggregated in +// the returned object. +func NewNodeInfo(pods ...*api.Pod) *NodeInfo { + ni := &NodeInfo{ + requestedResource: &Resource{}, + } + for _, pod := range pods { + ni.addPod(pod) + } + return ni +} + +// Pods return all pods scheduled (including assumed to be) on this node. +func (n *NodeInfo) Pods() []*api.Pod { + if n == nil { + return nil + } + return n.pods +} + +// RequestedResource returns aggregated resource request of pods on this node. +func (n *NodeInfo) RequestedResource() Resource { + if n == nil { + return emptyResource + } + return *n.requestedResource +} + +// String returns representation of human readable format of this NodeInfo. +func (n *NodeInfo) String() string { + podKeys := make([]string, len(n.pods)) + for i, pod := range n.pods { + podKeys[i] = pod.Name + } + return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v}", podKeys, n.requestedResource) +} + +// addPod adds pod information to this NodeInfo. +func (n *NodeInfo) addPod(pod *api.Pod) { + cpu, mem := calculateResource(pod) + n.requestedResource.MilliCPU += cpu + n.requestedResource.Memory += mem + n.pods = append(n.pods, pod) +} + +func calculateResource(pod *api.Pod) (int64, int64) { + var cpu, mem int64 + for _, c := range pod.Spec.Containers { + req := c.Resources.Requests + cpu += req.Cpu().MilliValue() + mem += req.Memory().Value() + } + return cpu, mem +} diff --git a/plugin/pkg/scheduler/schedulercache/util.go b/plugin/pkg/scheduler/schedulercache/util.go new file mode 100644 index 00000000000..7d840b59cd1 --- /dev/null +++ b/plugin/pkg/scheduler/schedulercache/util.go @@ -0,0 +1,35 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulercache + +import "k8s.io/kubernetes/pkg/api" + +// CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names +// and the values are the aggregated information for that node. +func CreateNodeNameToInfoMap(pods []*api.Pod) map[string]*NodeInfo { + nodeNameToInfo := make(map[string]*NodeInfo) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + nodeInfo, ok := nodeNameToInfo[nodeName] + if !ok { + nodeInfo = NewNodeInfo() + nodeNameToInfo[nodeName] = nodeInfo + } + nodeInfo.addPod(pod) + } + return nodeNameToInfo +}