diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 22fc5c87fc0..c8fea700210 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -450,7 +450,10 @@ func newLifecycleTest(t *testing.T) lifecycleTest { // create scheduler strategy := NewAllocationStrategy( - podtask.DefaultPredicate, + podtask.NewDefaultPredicate( + mresource.DefaultDefaultContainerCPULimit, + mresource.DefaultDefaultContainerMemLimit, + ), podtask.NewDefaultProcurement( mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit, diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 6f15565098c..710485b1eb1 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -143,13 +143,11 @@ func TestEmptyOffer(t *testing.T) { }}, } - mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) - mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - - if ok := DefaultPredicate(task, nil, nil); ok { + defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit) + if ok := defaultPredicate(task, nil, nil); ok { t.Fatalf("accepted nil offer") } - if ok := DefaultPredicate(task, &mesos.Offer{}, nil); ok { + if ok := defaultPredicate(task, &mesos.Offer{}, nil); ok { t.Fatalf("accepted empty offer") } } @@ -167,8 +165,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { }}, } - mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) - mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) + defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit) offer := &mesos.Offer{ Resources: []*mesos.Resource{ @@ -176,7 +173,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", 0.001), }, } - if ok := DefaultPredicate(task, offer, nil); ok { + if ok := defaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } @@ -186,7 +183,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", t_min_mem), }, } - if ok := DefaultPredicate(task, offer, nil); !ok { + if ok := defaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } } @@ -196,6 +193,8 @@ func TestAcceptOfferPorts(t *testing.T) { task, _ := fakePodTask("foo") pod := &task.Pod + defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit) + offer := &mesos.Offer{ Resources: []*mesos.Resource{ mutil.NewScalarResource("cpus", t_min_cpu), @@ -203,7 +202,7 @@ func TestAcceptOfferPorts(t *testing.T) { rangeResource("ports", []uint64{1, 1}), }, } - if ok := DefaultPredicate(task, offer, nil); !ok { + if ok := defaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -215,20 +214,17 @@ func TestAcceptOfferPorts(t *testing.T) { }}, } - mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) - mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - - if ok := DefaultPredicate(task, offer, nil); ok { + if ok := defaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := DefaultPredicate(task, offer, nil); !ok { + if ok := defaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 0 - if ok := DefaultPredicate(task, offer, nil); !ok { + if ok := defaultPredicate(task, offer, nil); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -236,12 +232,12 @@ func TestAcceptOfferPorts(t *testing.T) { mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("mem", t_min_mem), } - if ok := DefaultPredicate(task, offer, nil); ok { + if ok := defaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := DefaultPredicate(task, offer, nil); ok { + if ok := defaultPredicate(task, offer, nil); ok { t.Fatalf("accepted offer %v:", offer) } } @@ -326,6 +322,8 @@ func TestNodeSelector(t *testing.T) { {map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"}, } + defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit) + for _, ts := range tests { task, _ := fakePodTask("foo") task.Pod.Spec.NodeSelector = ts.selector @@ -336,7 +334,7 @@ func TestNodeSelector(t *testing.T) { }, Hostname: &ts.node.Name, } - if got, want := DefaultPredicate(task, offer, ts.node), ts.ok; got != want { + if got, want := defaultPredicate(task, offer, ts.node), ts.ok; got != want { t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc) } } diff --git a/contrib/mesos/pkg/scheduler/podtask/predicate.go b/contrib/mesos/pkg/scheduler/podtask/predicate.go index 25667e3d2c1..98da6ee7a89 100644 --- a/contrib/mesos/pkg/scheduler/podtask/predicate.go +++ b/contrib/mesos/pkg/scheduler/podtask/predicate.go @@ -24,12 +24,14 @@ import ( "k8s.io/kubernetes/pkg/labels" ) -var DefaultPredicate = RequireAllPredicate([]FitPredicate{ - ValidationPredicate, - NodeSelectorPredicate, - PodFitsResourcesPredicate, - PortsPredicate, -}).Fit +func NewDefaultPredicate(c mresource.CPUShares, m mresource.MegaBytes) FitPredicate { + return RequireAllPredicate([]FitPredicate{ + ValidationPredicate, + NodeSelectorPredicate, + NewPodFitsResourcesPredicate(c, m), + PortsPredicate, + }).Fit +} // FitPredicate implementations determine if the given task "fits" into offered Mesos resources. // Neither the task or offer should be modified. Note that the node can be nil. @@ -78,31 +80,33 @@ func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { return true } -func PodFitsResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool { - // find offered cpu and mem - var ( - offeredCpus mresource.CPUShares - offeredMem mresource.MegaBytes - ) - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) +func NewPodFitsResourcesPredicate(c mresource.CPUShares, m mresource.MegaBytes) func(t *T, offer *mesos.Offer, _ *api.Node) bool { + return func(t *T, offer *mesos.Offer, _ *api.Node) bool { + // find offered cpu and mem + var ( + offeredCpus mresource.CPUShares + offeredMem mresource.MegaBytes + ) + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) + } + + if resource.GetName() == "mem" { + offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) + } } - if resource.GetName() == "mem" { - offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) + // calculate cpu and mem sum over all containers of the pod + // TODO (@sttts): also support pod.spec.resources.limit.request + // TODO (@sttts): take into account the executor resources + cpu := mresource.CPUForPod(&t.Pod, c) + mem := mresource.MemForPod(&t.Pod, m) + log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) + if (cpu > offeredCpus) || (mem > offeredMem) { + log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) + return false } + return true } - - // calculate cpu and mem sum over all containers of the pod - // TODO (@sttts): also support pod.spec.resources.limit.request - // TODO (@sttts): take into account the executor resources - cpu := mresource.PodCPULimit(&t.Pod) - mem := mresource.PodMemLimit(&t.Pod) - log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) - if (cpu > offeredCpus) || (mem > offeredMem) { - log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) - return false - } - return true } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 119a46ddbf6..a097d4ed77a 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -678,8 +678,15 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } as := scheduler.NewAllocationStrategy( - podtask.DefaultPredicate, - podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit)) + podtask.NewDefaultPredicate( + s.DefaultContainerCPULimit, + s.DefaultContainerMemLimit, + ), + podtask.NewDefaultProcurement( + s.DefaultContainerCPULimit, + s.DefaultContainerMemLimit, + ), + ) // downgrade allocation strategy if user disables "account-for-pod-resources" if !s.AccountForPodResources {