Take default limits for cpu and mem into account in FitPredicate

This commit is contained in:
Dr. Stefan Schimanski 2015-10-09 16:32:45 +01:00
parent 49a5f89921
commit aea7985b85
4 changed files with 63 additions and 51 deletions

View File

@ -450,7 +450,10 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
// create scheduler // create scheduler
strategy := NewAllocationStrategy( strategy := NewAllocationStrategy(
podtask.DefaultPredicate, podtask.NewDefaultPredicate(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
),
podtask.NewDefaultProcurement( podtask.NewDefaultProcurement(
mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit,

View File

@ -143,13 +143,11 @@ func TestEmptyOffer(t *testing.T) {
}}, }},
} }
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) if ok := defaultPredicate(task, nil, nil); ok {
if ok := DefaultPredicate(task, nil, nil); ok {
t.Fatalf("accepted nil offer") t.Fatalf("accepted nil offer")
} }
if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok { if ok := defaultPredicate(task, &mesos.Offer{}, nil); ok {
t.Fatalf("accepted empty offer") t.Fatalf("accepted empty offer")
} }
} }
@ -167,8 +165,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
}}, }},
} }
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
offer := &mesos.Offer{ offer := &mesos.Offer{
Resources: []*mesos.Resource{ Resources: []*mesos.Resource{
@ -176,7 +173,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001), mutil.NewScalarResource("mem", 0.001),
}, },
} }
if ok := DefaultPredicate(task, offer, nil); ok { if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
@ -186,7 +183,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem), mutil.NewScalarResource("mem", t_min_mem),
}, },
} }
if ok := DefaultPredicate(task, offer, nil); !ok { if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
} }
@ -196,6 +193,8 @@ func TestAcceptOfferPorts(t *testing.T) {
task, _ := fakePodTask("foo") task, _ := fakePodTask("foo")
pod := &task.Pod pod := &task.Pod
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
offer := &mesos.Offer{ offer := &mesos.Offer{
Resources: []*mesos.Resource{ Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("cpus", t_min_cpu),
@ -203,7 +202,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}), rangeResource("ports", []uint64{1, 1}),
}, },
} }
if ok := DefaultPredicate(task, offer, nil); !ok { if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
@ -215,20 +214,17 @@ func TestAcceptOfferPorts(t *testing.T) {
}}, }},
} }
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) if ok := defaultPredicate(task, offer, nil); ok {
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := DefaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 1 pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer, nil); !ok { if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 0 pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := DefaultPredicate(task, offer, nil); !ok { if ok := defaultPredicate(task, offer, nil); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
@ -236,12 +232,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem), mutil.NewScalarResource("mem", t_min_mem),
} }
if ok := DefaultPredicate(task, offer, nil); ok { if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 1 pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := DefaultPredicate(task, offer, nil); ok { if ok := defaultPredicate(task, offer, nil); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
} }
@ -326,6 +322,8 @@ func TestNodeSelector(t *testing.T) {
{map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"}, {map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"},
} }
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
for _, ts := range tests { for _, ts := range tests {
task, _ := fakePodTask("foo") task, _ := fakePodTask("foo")
task.Pod.Spec.NodeSelector = ts.selector task.Pod.Spec.NodeSelector = ts.selector
@ -336,7 +334,7 @@ func TestNodeSelector(t *testing.T) {
}, },
Hostname: &ts.node.Name, Hostname: &ts.node.Name,
} }
if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want { if got, want := defaultPredicate(task, offer, ts.node), ts.ok; got != want {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc) t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc)
} }
} }

View File

@ -24,12 +24,14 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
) )
var DefaultPredicate = RequireAllPredicate([]FitPredicate{ func NewDefaultPredicate(c mresource.CPUShares, m mresource.MegaBytes) FitPredicate {
ValidationPredicate, return RequireAllPredicate([]FitPredicate{
NodeSelectorPredicate, ValidationPredicate,
PodFitsResourcesPredicate, NodeSelectorPredicate,
PortsPredicate, NewPodFitsResourcesPredicate(c, m),
}).Fit PortsPredicate,
}).Fit
}
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources. // FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified. Note that the node can be nil. // Neither the task or offer should be modified. Note that the node can be nil.
@ -78,31 +80,33 @@ func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
return true return true
} }
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { func NewPodFitsResourcesPredicate(c mresource.CPUShares, m mresource.MegaBytes) func(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem return func(t *T, offer *mesos.Offer, _ *api.Node) bool {
var ( // find offered cpu and mem
offeredCpus mresource.CPUShares var (
offeredMem mresource.MegaBytes offeredCpus mresource.CPUShares
) offeredMem mresource.MegaBytes
for _, resource := range offer.Resources { )
if resource.GetName() == "cpus" { for _, resource := range offer.Resources {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
} }
if resource.GetName() == "mem" { // calculate cpu and mem sum over all containers of the pod
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) // TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.CPUForPod(&t.Pod, c)
mem := mresource.MemForPod(&t.Pod, m)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
} }
return true
} }
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
} }

View File

@ -678,8 +678,15 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
} }
as := scheduler.NewAllocationStrategy( as := scheduler.NewAllocationStrategy(
podtask.DefaultPredicate, podtask.NewDefaultPredicate(
podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit)) s.DefaultContainerCPULimit,
s.DefaultContainerMemLimit,
),
podtask.NewDefaultProcurement(
s.DefaultContainerCPULimit,
s.DefaultContainerMemLimit,
),
)
// downgrade allocation strategy if user disables "account-for-pod-resources" // downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.AccountForPodResources { if !s.AccountForPodResources {