Merge pull request #13421 from mesosphere/optout_pod_resource_constraints

MESOS: Add --contain-pod-resources flag to determine how k8s-mesos will contain pod resources
This commit is contained in:
Piotr Szczesniak 2015-09-04 13:43:22 +02:00
commit 47af1a0c9b
14 changed files with 517 additions and 233 deletions

View File

@ -57,11 +57,14 @@ type MinionServer struct {
pathOverride string // the PATH environment for the sub-processes pathOverride string // the PATH environment for the sub-processes
cgroupPrefix string // e.g. mesos cgroupPrefix string // e.g. mesos
cgroupRoot string // e.g. /mesos/{container-id}, determined at runtime cgroupRoot string // the cgroupRoot that we pass to the kubelet-executor, depends on containPodResources
mesosCgroup string // discovered mesos cgroup root, e.g. /mesos/{container-id}
containPodResources bool
logMaxSize resource.Quantity logMaxSize resource.Quantity
logMaxBackups int logMaxBackups int
logMaxAgeInDays int logMaxAgeInDays int
logVerbosity int32 // see glog.Level
runProxy bool runProxy bool
proxyLogV int proxyLogV int
@ -74,6 +77,7 @@ func NewMinionServer() *MinionServer {
KubeletExecutorServer: exservice.NewKubeletExecutorServer(), KubeletExecutorServer: exservice.NewKubeletExecutorServer(),
privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace
cgroupPrefix: config.DefaultCgroupPrefix, cgroupPrefix: config.DefaultCgroupPrefix,
containPodResources: true,
logMaxSize: config.DefaultLogMaxSize(), logMaxSize: config.DefaultLogMaxSize(),
logMaxBackups: config.DefaultLogMaxBackups, logMaxBackups: config.DefaultLogMaxBackups,
logMaxAgeInDays: config.DefaultLogMaxAgeInDays, logMaxAgeInDays: config.DefaultLogMaxAgeInDays,
@ -131,7 +135,7 @@ func (ms *MinionServer) launchProxyServer() {
fmt.Sprintf("--bind-address=%s", bindAddress), fmt.Sprintf("--bind-address=%s", bindAddress),
fmt.Sprintf("--v=%d", ms.proxyLogV), fmt.Sprintf("--v=%d", ms.proxyLogV),
"--logtostderr=true", "--logtostderr=true",
"--resource-container=" + path.Join("/", ms.cgroupRoot, "kube-proxy"), "--resource-container=" + path.Join("/", ms.mesosCgroup, "kube-proxy"),
} }
if ms.clientConfig.Host != "" { if ms.clientConfig.Host != "" {
@ -156,7 +160,7 @@ func (ms *MinionServer) launchExecutorServer() <-chan struct{} {
ms.AddExecutorFlags(executorFlags) ms.AddExecutorFlags(executorFlags)
executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags) executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags)
executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.cgroupRoot, "kubelet")) executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.mesosCgroup, "kubelet"))
if ms.cgroupRoot != "" { if ms.cgroupRoot != "" {
executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot) executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot)
} }
@ -241,14 +245,23 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
ms.clientConfig = clientConfig ms.clientConfig = clientConfig
// derive the executor cgroup and use it as: // derive the executor cgroup and use it as:
// - pod container cgroup root (e.g. docker cgroup-parent) // - pod container cgroup root (e.g. docker cgroup-parent, optionally; see comments below)
// - parent of kubelet container // - parent of kubelet container
// - parent of kube-proxy container // - parent of kube-proxy container
ms.cgroupRoot = findMesosCgroup(ms.cgroupPrefix) ms.mesosCgroup = findMesosCgroup(ms.cgroupPrefix)
log.Infof("discovered mesos cgroup at %q", ms.mesosCgroup)
// hack alert, this helps to work around systemd+docker+mesos integration problems
// when docker's cgroup-parent flag is used (!containPodResources = don't use the docker flag)
if ms.containPodResources {
ms.cgroupRoot = ms.mesosCgroup
}
cgroupLogger := log.Infof cgroupLogger := log.Infof
if ms.cgroupRoot == "" { if ms.cgroupRoot == "" {
cgroupLogger = log.Warningf cgroupLogger = log.Warningf
} }
cgroupLogger("using cgroup-root %q", ms.cgroupRoot) cgroupLogger("using cgroup-root %q", ms.cgroupRoot)
// run subprocesses until ms.done is closed on return of this function // run subprocesses until ms.done is closed on return of this function
@ -302,6 +315,9 @@ func termSignalListener(abort <-chan struct{}) <-chan struct{} {
func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) { func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) {
ms.KubeletExecutorServer.AddFlags(fs) ms.KubeletExecutorServer.AddFlags(fs)
// hack to forward log verbosity flag to the executor
fs.Int32Var(&ms.logVerbosity, "v", ms.logVerbosity, "log level for V logs")
} }
func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) { func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) {
@ -309,6 +325,7 @@ func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) {
fs.StringVar(&ms.cgroupPrefix, "mesos-cgroup-prefix", ms.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") fs.StringVar(&ms.cgroupPrefix, "mesos-cgroup-prefix", ms.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only). Experimental, not yet compatible with k8s volumes.") fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only). Experimental, not yet compatible with k8s volumes.")
fs.StringVar(&ms.pathOverride, "path-override", ms.pathOverride, "Override the PATH in the environment of the sub-processes.") fs.StringVar(&ms.pathOverride, "path-override", ms.pathOverride, "Override the PATH in the environment of the sub-processes.")
fs.BoolVar(&ms.containPodResources, "contain-pod-resources", ms.containPodResources, "Allocate pod CPU and memory resources from offers and reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
// log file flags // log file flags
fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation") fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation")

View File

@ -24,8 +24,42 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
) )
type allocationStrategy struct {
fitPredicate podtask.FitPredicate
procurement podtask.Procurement
}
func (a *allocationStrategy) FitPredicate() podtask.FitPredicate {
return a.fitPredicate
}
func (a *allocationStrategy) Procurement() podtask.Procurement {
return a.procurement
}
func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtask.Procurement) AllocationStrategy {
if fitPredicate == nil {
panic("fitPredicate is required")
}
if procurement == nil {
panic("procurement is required")
}
return &allocationStrategy{
fitPredicate: fitPredicate,
procurement: procurement,
}
}
type fcfsPodScheduler struct {
AllocationStrategy
}
func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler {
return &fcfsPodScheduler{as}
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task // A first-come-first-serve scheduler: acquires the first offer that can support the task
func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) { func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) {
podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name) podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name)
var acceptedOffer offers.Perishable var acceptedOffer offers.Perishable
err := r.Walk(func(p offers.Perishable) (bool, error) { err := r.Walk(func(p offers.Perishable) (bool, error) {
@ -33,7 +67,7 @@ func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (of
if offer == nil { if offer == nil {
return false, fmt.Errorf("nil offer while scheduling task %v", task.ID) return false, fmt.Errorf("nil offer while scheduling task %v", task.ID)
} }
if task.AcceptOffer(offer) { if fps.FitPredicate()(task, offer) {
if p.Acquire() { if p.Acquire() {
acceptedOffer = p acceptedOffer = p
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue()) log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())

View File

@ -42,11 +42,11 @@ func (m *MockScheduler) slaveFor(id string) (slave *Slave, ok bool) {
ok = args.Bool(1) ok = args.Bool(1)
return return
} }
func (m *MockScheduler) algorithm() (f PodScheduleFunc) { func (m *MockScheduler) algorithm() (f PodScheduler) {
args := m.Called() args := m.Called()
x := args.Get(0) x := args.Get(0)
if x != nil { if x != nil {
f = x.(PodScheduleFunc) f = x.(PodScheduler)
} }
return return
} }

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
@ -56,8 +55,9 @@ const (
// scheduler abstraction to allow for easier unit testing // scheduler abstraction to allow for easier unit testing
type schedulerInterface interface { type schedulerInterface interface {
sync.Locker // synchronize scheduler plugin operations sync.Locker // synchronize scheduler plugin operations
SlaveIndex SlaveIndex
algorithm() PodScheduleFunc // see types.go algorithm() PodScheduler
offers() offers.Registry offers() offers.Registry
tasks() podtask.Registry tasks() podtask.Registry
@ -76,8 +76,8 @@ type k8smScheduler struct {
internal *KubernetesScheduler internal *KubernetesScheduler
} }
func (k *k8smScheduler) algorithm() PodScheduleFunc { func (k *k8smScheduler) algorithm() PodScheduler {
return k.internal.scheduleFunc return k.internal
} }
func (k *k8smScheduler) offers() offers.Registry { func (k *k8smScheduler) offers() offers.Registry {
@ -233,8 +233,6 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
type kubeScheduler struct { type kubeScheduler struct {
api schedulerInterface api schedulerInterface
podUpdates queue.FIFO podUpdates queue.FIFO
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
} }
// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching // recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
@ -318,7 +316,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
} }
} }
if err == nil && offer == nil { if err == nil && offer == nil {
offer, err = k.api.algorithm()(k.api.offers(), k.api, task) offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
} }
if err != nil { if err != nil {
return "", err return "", err
@ -338,18 +336,8 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
} }
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU {
log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod))
}
if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem {
log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod))
}
task.Offer = offer task.Offer = offer
task.FillFromDetails(details) k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if err := k.api.tasks().Update(task); err != nil { if err := k.api.tasks().Update(task); err != nil {
offer.Release() offer.Release()
@ -556,7 +544,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
defer k.api.Unlock() defer k.api.Unlock()
switch task, state := k.api.tasks().Get(task.ID); state { switch task, state := k.api.tasks().Get(task.ID); state {
case podtask.StatePending: case podtask.StatePending:
return !task.Has(podtask.Launched) && task.AcceptOffer(offer) return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer)
default: default:
// no point in continuing to check for matching offers // no point in continuing to check for matching offers
return true return true
@ -700,8 +688,6 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht
Algorithm: &kubeScheduler{ Algorithm: &kubeScheduler{
api: kapi, api: kapi,
podUpdates: podUpdates, podUpdates: podUpdates,
defaultContainerCPULimit: k.defaultContainerCPULimit,
defaultContainerMemLimit: k.defaultContainerMemLimit,
}, },
Binder: &binder{api: kapi}, Binder: &binder{api: kapi},
NextPod: q.yield, NextPod: q.yield,

View File

@ -393,13 +393,14 @@ func TestPlugin_LifeCycle(t *testing.T) {
executor.Data = []byte{0, 1, 2} executor.Data = []byte{0, 1, 2}
// create scheduler // create scheduler
as := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit))
testScheduler := New(Config{ testScheduler := New(Config{
Executor: executor, Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
ScheduleFunc: FCFSScheduleFunc, Scheduler: NewFCFSPodScheduler(as),
Schedcfg: *schedcfg.CreateDefaultConfig(), Schedcfg: *schedcfg.CreateDefaultConfig(),
DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit,
DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit,
}) })
assert.NotNil(testScheduler.client, "client is nil") assert.NotNil(testScheduler.client, "client is nil")

View File

@ -0,0 +1,73 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
)
// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave
const (
minimalCpus = 0.01
minimalMem = 0.25
)
var (
DefaultMinimalPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
MinimalPodResourcesPredicate,
PortsPredicate,
}).Fit
DefaultMinimalProcurement = AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
MinimalPodResourcesProcurement,
PortsProcurement,
}).Procure
)
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool {
var (
offeredCpus float64
offeredMem float64
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = resource.GetScalar().GetValue()
}
if resource.GetName() == "mem" {
offeredMem = resource.GetScalar().GetValue()
}
}
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
if (minimalCpus > offeredCpus) || (minimalMem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
return false
}
return true
}
func MinimalPodResourcesProcurement(t *T, details *mesos.Offer) error {
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
t.Spec.CPU = minimalCpus
t.Spec.Memory = minimalMem
return nil
}

View File

@ -18,7 +18,6 @@ package podtask
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -28,7 +27,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
log "github.com/golang/glog" log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
@ -150,59 +148,6 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo {
return info return info
} }
// Fill the Spec in the T, should be called during k8s scheduling, before binding.
func (t *T) FillFromDetails(details *mesos.Offer) error {
if details == nil {
//programming error
panic("offer details are nil")
}
// compute used resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec = Spec{
SlaveID: details.GetSlaveId().GetValue(),
AssignedSlave: details.GetHostname(),
CPU: cpu,
Memory: mem,
}
// fill in port mapping
if mapping, err := t.mapper.Generate(t, details); err != nil {
t.Reset()
return err
} else {
ports := []uint64{}
for _, entry := range mapping {
ports = append(ports, entry.OfferPort)
}
t.Spec.PortMap = mapping
t.Spec.Ports = ports
}
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
const HOSTNAME_OVERRIDE_FLAG = "--hostname-override="
hostname := details.GetHostname() // required field, non-empty
hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname
argv := t.executor.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) {
overwrite = true
argv[i] = hostnameOverride
break
}
}
if !overwrite {
t.executor.Command.Arguments = append(argv, hostnameOverride)
}
return nil
}
// Clear offer-related details from the task, should be called if/when an offer // Clear offer-related details from the task, should be called if/when an offer
// has already been assigned to a task but for some reason is no longer valid. // has already been assigned to a task but for some reason is no longer valid.
func (t *T) Reset() { func (t *T) Reset() {
@ -211,65 +156,6 @@ func (t *T) Reset() {
t.Spec = Spec{} t.Spec = Spec{}
} }
func (t *T) AcceptOffer(offer *mesos.Offer) bool {
if offer == nil {
return false
}
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
}
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
return false
}
}
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
return false
}
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}
func (t *T) Set(f FlagType) { func (t *T) Set(f FlagType) {
t.Flags[f] = struct{}{} t.Flags[f] = struct{}{}
if Launched == f { if Launched == f {

View File

@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := task.AcceptOffer(nil); ok { if ok := DefaultPredicate(task, nil); ok {
t.Fatalf("accepted nil offer") t.Fatalf("accepted nil offer")
} }
if ok := task.AcceptOffer(&mesos.Offer{}); ok { if ok := DefaultPredicate(task, &mesos.Offer{}); ok {
t.Fatalf("accepted empty offer") t.Fatalf("accepted empty offer")
} }
} }
@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001), mutil.NewScalarResource("mem", 0.001),
}, },
} }
if ok := task.AcceptOffer(offer); ok { if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem), mutil.NewScalarResource("mem", t_min_mem),
}, },
} }
if ok := task.AcceptOffer(offer); !ok { if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
} }
@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}), rangeResource("ports", []uint64{1, 1}),
}, },
} }
if ok := task.AcceptOffer(offer); !ok { if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := task.AcceptOffer(offer); ok { if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 1 pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := task.AcceptOffer(offer); !ok { if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 0 pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := task.AcceptOffer(offer); !ok { if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer) t.Fatalf("did not accepted offer %v:", offer)
} }
@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem), mutil.NewScalarResource("mem", t_min_mem),
} }
if ok := task.AcceptOffer(offer); ok { if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
pod.Spec.Containers[0].Ports[0].HostPort = 1 pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := task.AcceptOffer(offer); ok { if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer) t.Fatalf("accepted offer %v:", offer)
} }
} }
@ -297,7 +297,7 @@ func TestNodeSelector(t *testing.T) {
}, },
Attributes: ts.attrs, Attributes: ts.attrs,
} }
if got, want := task.AcceptOffer(offer), ts.ok; got != want { if got, want := DefaultPredicate(task, offer), ts.ok; got != want {
t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector) t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector)
} }
} }

View File

@ -0,0 +1,110 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/labels"
)
var DefaultPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
PodFitsResourcesPredicate,
PortsPredicate,
}).Fit
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified.
type FitPredicate func(*T, *mesos.Offer) bool
type RequireAllPredicate []FitPredicate
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool {
for _, p := range f {
if !p(t, offer) {
return false
}
}
return true
}
func ValidationPredicate(t *T, offer *mesos.Offer) bool {
return t != nil && offer != nil
}
func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
}
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
return false
}
}
return true
}
func PortsPredicate(t *T, offer *mesos.Offer) bool {
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
return false
}
return true
}
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}

View File

@ -0,0 +1,152 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"strings"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
)
// NewDefaultProcurement returns the default procurement strategy that combines validation
// and responsible Mesos resource procurement. c and m are resource quantities written into
// k8s api.Pod.Spec's that don't declare resources (all containers in k8s-mesos require cpu
// and memory limits).
func NewDefaultProcurement(c mresource.CPUShares, m mresource.MegaBytes) Procurement {
requireSome := &RequireSomePodResources{
defaultContainerCPULimit: c,
defaultContainerMemLimit: m,
}
return AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
requireSome.Procure,
PodResourcesProcurement,
PortsProcurement,
}).Procure
}
// Procurement funcs allocate resources for a task from an offer.
// Both the task and/or offer may be modified.
type Procurement func(*T, *mesos.Offer) error
// AllOrNothingProcurement provides a convenient wrapper around multiple Procurement
// objectives: the failure of any Procurement in the set results in Procure failing.
// see AllOrNothingProcurement.Procure
type AllOrNothingProcurement []Procurement
// Procure runs each Procurement in the receiver list. The first Procurement func that
// fails triggers T.Reset() and the error is returned, otherwise returns nil.
func (a AllOrNothingProcurement) Procure(t *T, offer *mesos.Offer) error {
for _, p := range a {
if err := p(t, offer); err != nil {
t.Reset()
return err
}
}
return nil
}
// ValidateProcurement checks that the offered resources are kosher, and if not panics.
// If things check out ok, t.Spec is cleared and nil is returned.
func ValidateProcurement(t *T, offer *mesos.Offer) error {
if offer == nil {
//programming error
panic("offer details are nil")
}
t.Spec = Spec{}
return nil
}
// NodeProcurement updates t.Spec in preparation for the task to be launched on the
// slave associated with the offer.
func NodeProcurement(t *T, offer *mesos.Offer) error {
t.Spec.SlaveID = offer.GetSlaveId().GetValue()
t.Spec.AssignedSlave = offer.GetHostname()
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
const HOSTNAME_OVERRIDE_FLAG = "--hostname-override="
hostname := offer.GetHostname() // required field, non-empty
hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname
argv := t.executor.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) {
overwrite = true
argv[i] = hostnameOverride
break
}
}
if !overwrite {
t.executor.Command.Arguments = append(argv, hostnameOverride)
}
return nil
}
type RequireSomePodResources struct {
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
}
func (r *RequireSomePodResources) Procure(t *T, offer *mesos.Offer) error {
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
// TODO(jdef): changing the state of t.Pod here feels dirty, especially since we don't use a kosher
// method to clone the api.Pod state in T.Clone(). This needs some love.
if unlimitedCPU := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit); unlimitedCPU {
log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", t.Pod.Namespace, t.Pod.Name, mresource.PodCPULimit(&t.Pod))
}
if unlimitedMem := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit); unlimitedMem {
log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", t.Pod.Namespace, t.Pod.Name, mresource.PodMemLimit(&t.Pod))
}
return nil
}
// PodResourcesProcurement converts k8s pod cpu and memory resource requirements into
// mesos resource allocations.
func PodResourcesProcurement(t *T, offer *mesos.Offer) error {
// compute used resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec.CPU = cpu
t.Spec.Memory = mem
return nil
}
// PortsProcurement convert host port mappings into mesos port resource allocations.
func PortsProcurement(t *T, offer *mesos.Offer) error {
// fill in port mapping
if mapping, err := t.mapper.Generate(t, offer); err != nil {
return err
} else {
ports := []uint64{}
for _, entry := range mapping {
ports = append(ports, entry.OfferPort)
}
t.Spec.PortMap = mapping
t.Spec.Ports = ports
}
return nil
}

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
@ -118,19 +117,17 @@ type KubernetesScheduler struct {
// and the invoking the pod registry interfaces. // and the invoking the pod registry interfaces.
// In particular, changes to podtask.T objects are currently guarded by this lock. // In particular, changes to podtask.T objects are currently guarded by this lock.
*sync.RWMutex *sync.RWMutex
PodScheduler
// Config related, write-once // Config related, write-once
schedcfg *schedcfg.Config schedcfg *schedcfg.Config
executor *mesos.ExecutorInfo executor *mesos.ExecutorInfo
executorGroup uint64 executorGroup uint64
scheduleFunc PodScheduleFunc
client *client.Client client *client.Client
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds failoverTimeout float64 // in seconds
reconcileInterval int64 reconcileInterval int64
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
// Mesos context. // Mesos context.
@ -159,14 +156,12 @@ type KubernetesScheduler struct {
type Config struct { type Config struct {
Schedcfg schedcfg.Config Schedcfg schedcfg.Config
Executor *mesos.ExecutorInfo Executor *mesos.ExecutorInfo
ScheduleFunc PodScheduleFunc Scheduler PodScheduler
Client *client.Client Client *client.Client
EtcdClient tools.EtcdClient EtcdClient tools.EtcdClient
FailoverTimeout float64 FailoverTimeout float64
ReconcileInterval int64 ReconcileInterval int64
ReconcileCooldown time.Duration ReconcileCooldown time.Duration
DefaultContainerCPULimit mresource.CPUShares
DefaultContainerMemLimit mresource.MegaBytes
} }
// New creates a new KubernetesScheduler // New creates a new KubernetesScheduler
@ -177,13 +172,11 @@ func New(config Config) *KubernetesScheduler {
RWMutex: new(sync.RWMutex), RWMutex: new(sync.RWMutex),
executor: config.Executor, executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
scheduleFunc: config.ScheduleFunc, PodScheduler: config.Scheduler,
client: config.Client, client: config.Client,
etcdClient: config.EtcdClient, etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout, failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval, reconcileInterval: config.ReconcileInterval,
defaultContainerCPULimit: config.DefaultContainerCPULimit,
defaultContainerMemLimit: config.DefaultContainerMemLimit,
offers: offers.CreateRegistry(offers.RegistryConfig{ offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool { Compat: func(o *mesos.Offer) bool {
// filter the offers: the executor IDs must not identify a kubelet- // filter the offers: the executor IDs must not identify a kubelet-

View File

@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -139,6 +140,8 @@ type SchedulerServer struct {
KubeletNetworkPluginName string KubeletNetworkPluginName string
StaticPodsConfigPath string StaticPodsConfigPath string
DockerCfgPath string DockerCfgPath string
ContainPodResources bool
AccountForPodResources bool
executable string // path to the binary running this service executable string // path to the binary running this service
client *client.Client client *client.Client
@ -182,6 +185,8 @@ func NewSchedulerServer() *SchedulerServer {
mux: http.NewServeMux(), mux: http.NewServeMux(),
KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
KubeletSyncFrequency: 10 * time.Second, KubeletSyncFrequency: 10 * time.Second,
ContainPodResources: true,
AccountForPodResources: true,
} }
// cache this for later use. also useful in case the original binary gets deleted, e.g. // cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc. // during upgrades, development deployments, etc.
@ -231,6 +236,8 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.IPVar(&s.ServiceAddress, "service-address", s.ServiceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") fs.IPVar(&s.ServiceAddress, "service-address", s.ServiceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)")
fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares")
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.") fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
@ -367,6 +374,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--mesos-cgroup-prefix=%v", s.MesosCgroupPrefix)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--mesos-cgroup-prefix=%v", s.MesosCgroupPrefix))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--contain-pod-resources=%t", s.ContainPodResources))
if s.AuthPath != "" { if s.AuthPath != "" {
//TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file //TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file
@ -651,17 +659,27 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("misconfigured etcd: %v", err) log.Fatalf("misconfigured etcd: %v", err)
} }
as := scheduler.NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit))
// downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.AccountForPodResources {
as = scheduler.NewAllocationStrategy(
podtask.DefaultMinimalPredicate,
podtask.DefaultMinimalProcurement)
}
fcfs := scheduler.NewFCFSPodScheduler(as)
mesosPodScheduler := scheduler.New(scheduler.Config{ mesosPodScheduler := scheduler.New(scheduler.Config{
Schedcfg: *sc, Schedcfg: *sc,
Executor: executor, Executor: executor,
ScheduleFunc: scheduler.FCFSScheduleFunc, Scheduler: fcfs,
Client: client, Client: client,
EtcdClient: etcdClient, EtcdClient: etcdClient,
FailoverTimeout: s.FailoverTimeout, FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval, ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown, ReconcileCooldown: s.ReconcileCooldown,
DefaultContainerCPULimit: s.DefaultContainerCPULimit,
DefaultContainerMemLimit: s.DefaultContainerMemLimit,
}) })
masterUri := s.MesosMaster masterUri := s.MesosMaster

View File

@ -23,17 +23,29 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
) )
// PodScheduleFunc implements how to schedule pods among slaves. type AllocationStrategy interface {
// FitPredicate returns the selector used to determine pod fitness w/ respect to a given offer
FitPredicate() podtask.FitPredicate
// Procurement returns a func that obtains resources for a task from resource offer
Procurement() podtask.Procurement
}
type PodScheduler interface {
AllocationStrategy
// SchedulePod implements how to schedule pods among slaves.
// We can have different implementation for different scheduling policy. // We can have different implementation for different scheduling policy.
// //
// The Schedule function accepts a group of slaves (each contains offers from // The function accepts a group of slaves (each contains offers from
// that slave) and a single pod, which aligns well with the k8s scheduling // that slave) and a single pod, which aligns well with the k8s scheduling
// algorithm. It returns an offerId that is acceptable for the pod, otherwise // algorithm. It returns an offerId that is acceptable for the pod, otherwise
// nil. The caller is responsible for filling in task state w/ relevant offer // nil. The caller is responsible for filling in task state w/ relevant offer
// details. // details.
// //
// See the FCFSScheduleFunc for example. // See the FCFSPodScheduler for example.
type PodScheduleFunc func(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error) SchedulePod(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error)
}
// A minimal placeholder // A minimal placeholder
type empty struct{} type empty struct{}

View File

@ -1,5 +1,6 @@
accept-hosts accept-hosts
accept-paths accept-paths
account-for-pod-resources
admission-control admission-control
admission-control-config-file admission-control-config-file
advertise-address advertise-address
@ -43,6 +44,7 @@ cluster-name
cluster-tag cluster-tag
concurrent-endpoint-syncs concurrent-endpoint-syncs
configure-cbr0 configure-cbr0
contain-pod-resources
container-port container-port
container-runtime container-runtime
cors-allowed-origins cors-allowed-origins