Merge pull request #15376 from mesosphere/sttts-fix-default-limits-in-predicate

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-10 12:20:27 -07:00
commit 40b5d0583f
4 changed files with 63 additions and 51 deletions

View File

@ -450,7 +450,10 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
// create scheduler
strategy := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultPredicate(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
),
podtask.NewDefaultProcurement(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,

View File

@ -143,13 +143,11 @@ func TestEmptyOffer(t *testing.T) {
}},
}
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, nil, nil); ok {
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
if ok := defaultPredicate(task, nil, nil); ok {
t.Fatalf("accepted nil offer")
}
if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok {
if ok := defaultPredicate(task, &mesos.Offer{}, nil); ok {
t.Fatalf("accepted empty offer")
}
}
@ -167,8 +165,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
}},
}
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
offer := &mesos.Offer{
Resources: []*mesos.Resource{
@ -176,7 +173,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001),
},
}
if ok := DefaultPredicate(task, offer, nil); ok {
if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
@ -186,7 +183,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
}
if ok := DefaultPredicate(task, offer, nil); !ok {
if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
}
@ -196,6 +193,8 @@ func TestAcceptOfferPorts(t *testing.T) {
task, _ := fakePodTask("foo")
pod := &task.Pod
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
offer := &mesos.Offer{
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", t_min_cpu),
@ -203,7 +202,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}),
},
}
if ok := DefaultPredicate(task, offer, nil); !ok {
if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -215,20 +214,17 @@ func TestAcceptOfferPorts(t *testing.T) {
}},
}
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, offer, nil); ok {
if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer, nil); !ok {
if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := DefaultPredicate(task, offer, nil); !ok {
if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -236,12 +232,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
}
if ok := DefaultPredicate(task, offer, nil); ok {
if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer, nil); ok {
if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer)
}
}
@ -326,6 +322,8 @@ func TestNodeSelector(t *testing.T) {
{map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"},
}
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
for _, ts := range tests {
task, _ := fakePodTask("foo")
task.Pod.Spec.NodeSelector = ts.selector
@ -336,7 +334,7 @@ func TestNodeSelector(t *testing.T) {
},
Hostname: &ts.node.Name,
}
if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want {
if got, want := defaultPredicate(task, offer, ts.node), ts.ok; got != want {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc)
}
}

View File

@ -24,12 +24,14 @@ import (
"k8s.io/kubernetes/pkg/labels"
)
var DefaultPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
PodFitsResourcesPredicate,
PortsPredicate,
}).Fit
func NewDefaultPredicate(c mresource.CPUShares, m mresource.MegaBytes) FitPredicate {
return RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
NewPodFitsResourcesPredicate(c, m),
PortsPredicate,
}).Fit
}
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified. Note that the node can be nil.
@ -78,31 +80,33 @@ func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
return true
}
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
func NewPodFitsResourcesPredicate(c mresource.CPUShares, m mresource.MegaBytes) func(t *T, offer *mesos.Offer, _ *api.Node) bool {
return func(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.CPUForPod(&t.Pod, c)
mem := mresource.MemForPod(&t.Pod, m)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}

View File

@ -678,8 +678,15 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
}
as := scheduler.NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit))
podtask.NewDefaultPredicate(
s.DefaultContainerCPULimit,
s.DefaultContainerMemLimit,
),
podtask.NewDefaultProcurement(
s.DefaultContainerCPULimit,
s.DefaultContainerMemLimit,
),
)
// downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.AccountForPodResources {