Merge pull request #20857 from hongchaodeng/s1

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-10 05:18:14 -08:00
commit 8d4f5c3946
17 changed files with 343 additions and 255 deletions

View File

@ -24,6 +24,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -122,9 +123,9 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func NoDiskConflict(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range existingPods {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return false, nil
}
@ -198,7 +199,7 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace
return nil
}
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
newVolumes := make(map[string]bool)
if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, err
@ -211,7 +212,7 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Po
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range existingPods {
for _, existingPod := range nodeInfo.Pods() {
if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
return false, err
}
@ -297,13 +298,13 @@ func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcI
return c.predicate
}
func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeID)
func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeID)
return false, fmt.Errorf("node not found: %q", nodeName)
}
nodeConstraints := make(map[string]string)
@ -360,7 +361,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nod
}
nodeV, _ := nodeConstraints[k]
if v != nodeV {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeID, pvName, k)
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k)
return false, nil
}
}
@ -389,18 +390,6 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
return result
}
func getTotalResourceRequest(pods []*api.Pod) resourceRequest {
result := resourceRequest{}
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
result.memory += requests.Memory().Value()
result.milliCPU += requests.Cpu().MilliValue()
}
}
return result
}
func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) {
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
@ -433,16 +422,17 @@ func podName(pod *api.Pod) string {
}
// PodFitsResources calculates fit based on requested, rather than used resources
func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
info, err := r.info.GetNodeInfo(node)
func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
allocatable := info.Status.Allocatable
if int64(len(existingPods))+1 > allocatable.Pods().Value() {
return false, newInsufficientResourceError(podCountResourceName, 1,
int64(len(existingPods)), allocatable.Pods().Value())
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber)
}
podRequest := getResourceRequest(pod)
@ -450,17 +440,18 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, no
return true, nil
}
pods := append(existingPods, pod)
_, exceedingCPU, exceedingMemory := CheckPodsExceedingFreeResources(pods, allocatable)
if len(exceedingCPU) > 0 {
return false, newInsufficientResourceError(cpuResourceName, podRequest.milliCPU,
getTotalResourceRequest(existingPods).milliCPU, allocatable.Cpu().MilliValue())
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
return false,
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU)
}
if len(exceedingMemory) > 0 {
return false, newInsufficientResourceError(memoryResoureceName, podRequest.memory,
getTotalResourceRequest(existingPods).memory, allocatable.Memory().Value())
if totalMemory < podRequest.memory+nodeInfo.RequestedResource().Memory {
return false,
newInsufficientResourceError(memoryResoureceName, podRequest.memory, nodeInfo.RequestedResource().Memory, totalMemory)
}
glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", podName(pod), node, len(pods)-1, allocatable.Pods().Value())
glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), nodeName, len(nodeInfo.Pods()), allowedPodNumber)
return true, nil
}
@ -548,19 +539,19 @@ type NodeSelector struct {
info NodeInfo
}
func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
node, err := n.info.GetNodeInfo(nodeID)
func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return PodMatchesNodeLabels(pod, node), nil
}
func PodFitsHost(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
if len(pod.Spec.NodeName) == 0 {
return true, nil
}
return pod.Spec.NodeName == node, nil
return pod.Spec.NodeName == nodeName, nil
}
type NodeLabelChecker struct {
@ -590,9 +581,9 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori
// Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var exists bool
node, err := n.info.GetNodeInfo(nodeID)
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
@ -632,7 +623,7 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
// - some other pod from the same service is already scheduled onto a node that has value V for label L
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var affinitySelector labels.Selector
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
@ -692,7 +683,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api
affinitySelector = labels.Set(affinityLabels).AsSelector()
}
node, err := s.nodeInfo.GetNodeInfo(nodeID)
node, err := s.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
@ -701,12 +692,12 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api
return affinitySelector.Matches(labels.Set(node.Labels)), nil
}
func PodFitsHostPorts(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
wantPorts := getUsedPorts(pod)
if len(wantPorts) == 0 {
return true, nil
}
existingPorts := getUsedPorts(existingPods...)
existingPorts := getUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts {
if wport == 0 {
continue
@ -730,22 +721,6 @@ func getUsedPorts(pods ...*api.Pod) map[int]bool {
return ports
}
// MapPodsToMachines obtains a list of pods and pivots that list into a map where the keys are host names
// and the values are the list of pods running on that host.
func MapPodsToMachines(lister algorithm.PodLister) (map[string][]*api.Pod, error) {
machineToPods := map[string][]*api.Pod{}
// TODO: perform more targeted query...
pods, err := lister.List(labels.Everything())
if err != nil {
return map[string][]*api.Pod{}, err
}
for _, scheduledPod := range pods {
host := scheduledPod.Spec.NodeName
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
return machineToPods, nil
}
// search two arrays and return true if they have at least one common element; return false otherwise
func haveSame(a1, a2 []string) bool {
for _, val1 := range a1 {

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type FakeNodeInfo api.Node
@ -105,53 +106,48 @@ func newResourcePod(usage ...resourceRequest) *api.Pod {
func TestPodFitsResources(t *testing.T) {
enoughPodsTests := []struct {
pod *api.Pod
existingPods []*api.Pod
fits bool
test string
wErr error
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
fits bool
test string
wErr error
}{
{
pod: &api.Pod{},
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 10, memory: 20}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 10, memory: 20})),
fits: true,
test: "no resources requested always fits",
wErr: nil,
},
{
pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 10, memory: 20}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 10, memory: 20})),
fits: false,
test: "too many resources fails",
wErr: newInsufficientResourceError(cpuResourceName, 1, 10, 10),
},
{
pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 5, memory: 5}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 5})),
fits: true,
test: "both resources fit",
wErr: nil,
},
{
pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 2}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 5, memory: 19}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 19})),
fits: false,
test: "one resources fits",
wErr: newInsufficientResourceError(memoryResoureceName, 2, 19, 20),
},
{
pod: newResourcePod(resourceRequest{milliCPU: 5, memory: 1}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 5, memory: 19}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 19})),
fits: true,
test: "equal edge case",
wErr: nil,
@ -162,7 +158,7 @@ func TestPodFitsResources(t *testing.T) {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}
fit := ResourceFit{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, test.existingPods, "machine")
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
@ -172,35 +168,32 @@ func TestPodFitsResources(t *testing.T) {
}
notEnoughPodsTests := []struct {
pod *api.Pod
existingPods []*api.Pod
fits bool
test string
wErr error
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
fits bool
test string
wErr error
}{
{
pod: &api.Pod{},
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 10, memory: 20}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 10, memory: 20})),
fits: false,
test: "even without specified resources predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
},
{
pod: newResourcePod(resourceRequest{milliCPU: 1, memory: 1}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 5, memory: 5}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 5})),
fits: false,
test: "even if both resources fit predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
},
{
pod: newResourcePod(resourceRequest{milliCPU: 5, memory: 1}),
existingPods: []*api.Pod{
newResourcePod(resourceRequest{milliCPU: 5, memory: 19}),
},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 19})),
fits: false,
test: "even for equal edge case predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
@ -210,7 +203,7 @@ func TestPodFitsResources(t *testing.T) {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}}
fit := ResourceFit{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, test.existingPods, "machine")
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
@ -256,7 +249,7 @@ func TestPodFitsHost(t *testing.T) {
}
for _, test := range tests {
result, err := PodFitsHost(test.pod, []*api.Pod{}, test.node)
result, err := PodFitsHost(test.pod, test.node, schedulercache.NewNodeInfo())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -285,52 +278,48 @@ func newPod(host string, hostPorts ...int) *api.Pod {
func TestPodFitsHostPorts(t *testing.T) {
tests := []struct {
pod *api.Pod
existingPods []*api.Pod
fits bool
test string
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
fits bool
test string
}{
{
pod: &api.Pod{},
existingPods: []*api.Pod{},
fits: true,
test: "nothing running",
pod: &api.Pod{},
nodeInfo: schedulercache.NewNodeInfo(),
fits: true,
test: "nothing running",
},
{
pod: newPod("m1", 8080),
existingPods: []*api.Pod{
newPod("m1", 9090),
},
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 9090)),
fits: true,
test: "other port",
},
{
pod: newPod("m1", 8080),
existingPods: []*api.Pod{
newPod("m1", 8080),
},
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8080)),
fits: false,
test: "same port",
},
{
pod: newPod("m1", 8000, 8080),
existingPods: []*api.Pod{
newPod("m1", 8080),
},
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8080)),
fits: false,
test: "second port",
},
{
pod: newPod("m1", 8000, 8080),
existingPods: []*api.Pod{
newPod("m1", 8001, 8080),
},
nodeInfo: schedulercache.NewNodeInfo(
newPod("m1", 8001, 8080)),
fits: false,
test: "second port",
},
}
for _, test := range tests {
fits, err := PodFitsHostPorts(test.pod, test.existingPods, "machine")
fits, err := PodFitsHostPorts(test.pod, "machine", test.nodeInfo)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -400,27 +389,27 @@ func TestDiskConflicts(t *testing.T) {
},
}
tests := []struct {
pod *api.Pod
existingPods []*api.Pod
isOk bool
test string
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
isOk bool
test string
}{
{&api.Pod{}, []*api.Pod{}, true, "nothing"},
{&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"},
{&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"},
{&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"},
{&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"},
{&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"},
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, test.existingPods, "machine")
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
}
}
}
@ -449,27 +438,27 @@ func TestAWSDiskConflicts(t *testing.T) {
},
}
tests := []struct {
pod *api.Pod
existingPods []*api.Pod
isOk bool
test string
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
isOk bool
test string
}{
{&api.Pod{}, []*api.Pod{}, true, "nothing"},
{&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"},
{&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"},
{&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"},
{&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"},
{&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"},
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, test.existingPods, "machine")
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
}
}
}
@ -504,27 +493,27 @@ func TestRBDDiskConflicts(t *testing.T) {
},
}
tests := []struct {
pod *api.Pod
existingPods []*api.Pod
isOk bool
test string
pod *api.Pod
nodeInfo *schedulercache.NodeInfo
isOk bool
test string
}{
{&api.Pod{}, []*api.Pod{}, true, "nothing"},
{&api.Pod{}, []*api.Pod{{Spec: volState}}, true, "one state"},
{&api.Pod{Spec: volState}, []*api.Pod{{Spec: volState}}, false, "same state"},
{&api.Pod{Spec: volState2}, []*api.Pod{{Spec: volState}}, true, "different state"},
{&api.Pod{}, schedulercache.NewNodeInfo(), true, "nothing"},
{&api.Pod{}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "one state"},
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, test.existingPods, "machine")
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test)
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
}
}
}
@ -989,7 +978,7 @@ func TestPodFitsSelector(t *testing.T) {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}}
fit := NodeSelector{FakeNodeInfo(node)}
fits, err := fit.PodSelectorMatches(test.pod, []*api.Pod{}, "machine")
fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1002,12 +991,11 @@ func TestPodFitsSelector(t *testing.T) {
func TestNodeLabelPresence(t *testing.T) {
label := map[string]string{"foo": "bar", "bar": "foo"}
tests := []struct {
pod *api.Pod
existingPods []*api.Pod
labels []string
presence bool
fits bool
test string
pod *api.Pod
labels []string
presence bool
fits bool
test string
}{
{
labels: []string{"baz"},
@ -1049,7 +1037,7 @@ func TestNodeLabelPresence(t *testing.T) {
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, test.existingPods, "machine")
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1189,7 +1177,7 @@ func TestServiceAffinity(t *testing.T) {
for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []*api.Pod{}, test.node)
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, test.node, schedulercache.NewNodeInfo())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1409,7 +1397,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
for _, test := range tests {
pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo)
fits, err := pred(test.newPod, test.existingPods, "some-node")
fits, err := pred(test.newPod, "some-node", schedulercache.NewNodeInfo(test.existingPods...))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type NodeAffinity struct {
@ -40,7 +41,7 @@ func NewNodeAffinityPriority(nodeLister algorithm.NodeLister) algorithm.Priority
// it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var maxCount int
counts := map[string]int{}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func TestNodeAffinityPriority(t *testing.T) {
@ -156,7 +157,7 @@ func TestNodeAffinityPriority(t *testing.T) {
for _, test := range tests {
nodeAffinity := NodeAffinity{nodeLister: algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})}
list, err := nodeAffinity.CalculateNodeAffinityPriority(test.pod, nil, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
list, err := nodeAffinity.CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// the unused capacity is calculated on a scale of 0-10
@ -114,7 +115,7 @@ func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) sc
// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes
// based on the minimum of the average of the fraction of requested to capacity.
// Details: cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2
func LeastRequestedPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
return schedulerapi.HostPriorityList{}, err
@ -122,7 +123,7 @@ func LeastRequestedPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod,
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateResourceOccupancy(pod, node, machinesToPods[node.Name]))
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods()))
}
return list, nil
}
@ -143,7 +144,7 @@ func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunctio
// CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value.
// If presence is true, prioritizes nodes that have the specified label, regardless of value.
// If presence is false, prioritizes nodes that do not have the specified label.
func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var score int
nodes, err := nodeLister.List()
if err != nil {
@ -182,7 +183,7 @@ const (
// based on the total size of those images.
// - If none of the images are present, this node will be given the lowest priority.
// - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority.
func ImageLocalityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
sumSizeMap := make(map[string]int64)
nodes, err := nodeLister.List()
@ -248,7 +249,7 @@ func calculateScoreFromSize(sumSize int64) int {
// close the two metrics are to each other.
// Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by:
// "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization"
func BalancedResourceAllocation(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
return schedulerapi.HostPriorityList{}, err
@ -256,7 +257,7 @@ func BalancedResourceAllocation(pod *api.Pod, machinesToPods map[string][]*api.P
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateBalancedResourceAllocation(pod, node, machinesToPods[node.Name]))
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods()))
}
return list, nil
}

View File

@ -26,8 +26,8 @@ import (
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func makeNode(node string, milliCPU, memory int64) api.Node {
@ -132,18 +132,15 @@ func TestZeroRequest(t *testing.T) {
const expectedPriority int = 25
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
list, err := scheduler.PrioritizeNodes(
test.pod,
m2p,
nodeNameToInfo,
algorithm.FakePodLister(test.pods),
// This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.
[]algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}},
[]algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}},
algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{})
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -387,11 +384,8 @@ func TestLeastRequested(t *testing.T) {
}
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := LeastRequestedPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
list, err := LeastRequestedPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -485,7 +479,7 @@ func TestNewNodeLabelPriority(t *testing.T) {
label: test.label,
presence: test.presence,
}
list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string][]*api.Pod{}, nil, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string]*schedulercache.NodeInfo{}, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -723,11 +717,8 @@ func TestBalancedResourceAllocation(t *testing.T) {
}
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := BalancedResourceAllocation(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -870,11 +861,8 @@ func TestImageLocalityPriority(t *testing.T) {
}
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := ImageLocalityPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// The maximum priority value to give to a node
@ -34,12 +35,14 @@ const maxPriority = 10
const zoneWeighting = 2.0 / 3.0
type SelectorSpread struct {
podLister algorithm.PodLister
serviceLister algorithm.ServiceLister
controllerLister algorithm.ControllerLister
}
func NewSelectorSpreadPriority(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction {
func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction {
selectorSpread := &SelectorSpread{
podLister: podLister,
serviceLister: serviceLister,
controllerLister: controllerLister,
}
@ -73,7 +76,7 @@ func getZoneKey(node *api.Node) string {
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service selectors or RC selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var nsPods []*api.Pod
selectors := make([]labels.Selector, 0)
@ -91,7 +94,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods ma
}
if len(selectors) > 0 {
pods, err := podLister.List(labels.Everything())
pods, err := s.podLister.List(labels.Everything())
if err != nil {
return nil, err
}
@ -198,12 +201,14 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, machinesToPods ma
}
type ServiceAntiAffinity struct {
podLister algorithm.PodLister
serviceLister algorithm.ServiceLister
label string
}
func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction {
func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction {
antiAffinity := &ServiceAntiAffinity{
podLister: podLister,
serviceLister: serviceLister,
label: label,
}
@ -213,7 +218,7 @@ func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label
// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service
// on machines with the same value for a particular label.
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var nsServicePods []*api.Pod
services, err := s.serviceLister.GetPodServices(pod)
@ -221,7 +226,7 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, machin
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err := podLister.List(selector)
pods, err := s.podLister.List(selector)
if err != nil {
return nil, err
}

View File

@ -24,8 +24,8 @@ import (
"k8s.io/kubernetes/pkg/api"
wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func TestSelectorSpreadPriority(t *testing.T) {
@ -219,12 +219,9 @@ func TestSelectorSpreadPriority(t *testing.T) {
}
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeNodeList(test.nodes)))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -422,12 +419,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
}
for _, test := range tests {
selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes)))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)}
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -597,12 +591,9 @@ func TestZoneSpreadPriority(t *testing.T) {
}
for _, test := range tests {
m2p, err := predicates.MapPodsToMachines(algorithm.FakePodLister(test.pods))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
zoneSpread := ServiceAntiAffinity{serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"}
list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, m2p, algorithm.FakePodLister(test.pods), algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes)))
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods)
zoneSpread := ServiceAntiAffinity{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), label: "zone"}
list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -19,12 +19,13 @@ package algorithm
import (
"k8s.io/kubernetes/pkg/api"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// FitPredicate is a function that indicates if a pod fits into an existing node.
type FitPredicate func(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error)
type FitPredicate func(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error)
type PriorityFunction func(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister PodLister, nodeLister NodeLister) (schedulerapi.HostPriorityList, error)
type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error)
type PriorityConfig struct {
Function PriorityFunction

View File

@ -68,7 +68,7 @@ func init() {
"ServiceSpreadingPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{})
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{})
},
Weight: 1,
},
@ -141,7 +141,7 @@ func defaultPriorities() sets.String {
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister)
return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister)
},
Weight: 1,
},

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
@ -88,7 +89,7 @@ func machine2PrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulera
return &result, nil
}
func machine2Prioritizer(_ *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func machine2Prioritizer(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
return []schedulerapi.HostPriority{}, err

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func TestCreate(t *testing.T) {
@ -117,19 +118,19 @@ func TestCreateFromEmptyConfig(t *testing.T) {
factory.CreateFromConfig(policy)
}
func PredicateOne(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PredicateOne(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
}
func PredicateTwo(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PredicateTwo(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
}
func PriorityOne(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func PriorityOne(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
return []schedulerapi.HostPriority{}, nil
}
func PriorityTwo(pod *api.Pod, m2p map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func PriorityTwo(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
return []schedulerapi.HostPriority{}, nil
}

View File

@ -168,6 +168,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
pcf = &PriorityConfigFactory{
Function: func(args PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewServiceAntiAffinityPriority(
args.PodLister,
args.ServiceLister,
policy.Argument.ServiceAntiAffinity.Label,
)

View File

@ -25,11 +25,13 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type FailedPredicateMap map[string]sets.String
@ -75,12 +77,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
// TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute.
// But at least we're now only doing it in one place
machinesToPods, err := predicates.MapPodsToMachines(g.pods)
pods, err := g.pods.List(labels.Everything())
if err != nil {
return "", err
}
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, machinesToPods, g.predicates, nodes, g.extenders)
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil {
return "", err
}
@ -92,7 +94,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
}
priorityList, err := PrioritizeNodes(pod, machinesToPods, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil {
return "", err
}
@ -129,14 +131,14 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) {
func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) {
filtered := []api.Node{}
failedPredicateMap := FailedPredicateMap{}
for _, node := range nodes.Items {
fits := true
for name, predicate := range predicateFuncs {
fit, err := predicate(pod, machineToPods[node.Name], node.Name)
fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name])
if err != nil {
switch e := err.(type) {
case *predicates.InsufficientResourceError:
@ -186,13 +188,13 @@ func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predica
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{}
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
return EqualPriority(pod, machinesToPods, podLister, nodeLister)
return EqualPriority(pod, nodeNameToInfo, nodeLister)
}
var (
@ -213,7 +215,7 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList
defer wg.Done()
weight := config.Weight
priorityFunc := config.Function
prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister)
prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodeLister)
if err != nil {
mu.Lock()
errs = append(errs, err)
@ -269,7 +271,7 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList
}
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
func EqualPriority(_ *api.Pod, machinesToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
glog.Errorf("Failed to list nodes: %v", err)

View File

@ -27,25 +27,26 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func falsePredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return false, nil
}
func truePredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
}
func matchesPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
return pod.Name == node, nil
func matchesPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return pod.Name == nodeName, nil
}
func hasNoPodsPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
return len(existingPods) == 0, nil
func hasNoPodsPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return len(nodeInfo.Pods()) == 0, nil
}
func numericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
result := []schedulerapi.HostPriority{}
@ -65,11 +66,11 @@ func numericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podListe
return result, nil
}
func reverseNumericPriority(pod *api.Pod, machineToPods map[string][]*api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
func reverseNumericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []schedulerapi.HostPriority{}
result, err := numericPriority(pod, machineToPods, podLister, nodeLister)
result, err := numericPriority(pod, nodeNameToInfo, nodeLister)
if err != nil {
return nil, err
}
@ -275,12 +276,12 @@ func TestGenericScheduler(t *testing.T) {
func TestFindFitAllError(t *testing.T) {
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}
machineToPods := map[string][]*api.Pod{
"3": {},
"2": {},
"1": {},
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&api.Pod{}, machineToPods, predicates, makeNodeList(nodes), nil)
_, predicateMap, err := findNodesThatFit(&api.Pod{}, nodeNameToInfo, predicates, makeNodeList(nodes), nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -305,12 +306,12 @@ func TestFindFitSomeError(t *testing.T) {
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate}
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "1"}}
machineToPods := map[string][]*api.Pod{
"3": {},
"2": {},
"1": {pod},
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(pod),
}
_, predicateMap, err := findNodesThatFit(pod, machineToPods, predicates, makeNodeList(nodes), nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -0,0 +1,96 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
)
var emptyResource = Resource{}
// NodeInfo is node level aggregated information.
type NodeInfo struct {
// Total requested resource of all pods on this node.
// It includes assumed pods which scheduler sends binding to apiserver but
// didn't get it as scheduled yet.
requestedResource *Resource
pods []*api.Pod
}
// Resource is a collection of compute resource.
type Resource struct {
MilliCPU int64
Memory int64
}
// NewNodeInfo returns a ready to use empty NodeInfo object.
// If any pods are given in arguments, their information will be aggregated in
// the returned object.
func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
}
for _, pod := range pods {
ni.addPod(pod)
}
return ni
}
// Pods return all pods scheduled (including assumed to be) on this node.
func (n *NodeInfo) Pods() []*api.Pod {
if n == nil {
return nil
}
return n.pods
}
// RequestedResource returns aggregated resource request of pods on this node.
func (n *NodeInfo) RequestedResource() Resource {
if n == nil {
return emptyResource
}
return *n.requestedResource
}
// String returns representation of human readable format of this NodeInfo.
func (n *NodeInfo) String() string {
podKeys := make([]string, len(n.pods))
for i, pod := range n.pods {
podKeys[i] = pod.Name
}
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v}", podKeys, n.requestedResource)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *api.Pod) {
cpu, mem := calculateResource(pod)
n.requestedResource.MilliCPU += cpu
n.requestedResource.Memory += mem
n.pods = append(n.pods, pod)
}
func calculateResource(pod *api.Pod) (int64, int64) {
var cpu, mem int64
for _, c := range pod.Spec.Containers {
req := c.Resources.Requests
cpu += req.Cpu().MilliValue()
mem += req.Memory().Value()
}
return cpu, mem
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import "k8s.io/kubernetes/pkg/api"
// CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names
// and the values are the aggregated information for that node.
func CreateNodeNameToInfoMap(pods []*api.Pod) map[string]*NodeInfo {
nodeNameToInfo := make(map[string]*NodeInfo)
for _, pod := range pods {
nodeName := pod.Spec.NodeName
nodeInfo, ok := nodeNameToInfo[nodeName]
if !ok {
nodeInfo = NewNodeInfo()
nodeNameToInfo[nodeName] = nodeInfo
}
nodeInfo.addPod(pod)
}
return nodeNameToInfo
}