diff --git a/contrib/mesos/pkg/minion/server.go b/contrib/mesos/pkg/minion/server.go index 1ec81f7cb44..e7b73eeb831 100644 --- a/contrib/mesos/pkg/minion/server.go +++ b/contrib/mesos/pkg/minion/server.go @@ -55,13 +55,16 @@ type MinionServer struct { kmBinary string tasks []*tasks.Task - pathOverride string // the PATH environment for the sub-processes - cgroupPrefix string // e.g. mesos - cgroupRoot string // e.g. /mesos/{container-id}, determined at runtime + pathOverride string // the PATH environment for the sub-processes + cgroupPrefix string // e.g. mesos + cgroupRoot string // the cgroupRoot that we pass to the kubelet-executor, depends on containPodResources + mesosCgroup string // discovered mesos cgroup root, e.g. /mesos/{container-id} + containPodResources bool logMaxSize resource.Quantity logMaxBackups int logMaxAgeInDays int + logVerbosity int32 // see glog.Level runProxy bool proxyLogV int @@ -74,6 +77,7 @@ func NewMinionServer() *MinionServer { KubeletExecutorServer: exservice.NewKubeletExecutorServer(), privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace cgroupPrefix: config.DefaultCgroupPrefix, + containPodResources: true, logMaxSize: config.DefaultLogMaxSize(), logMaxBackups: config.DefaultLogMaxBackups, logMaxAgeInDays: config.DefaultLogMaxAgeInDays, @@ -131,7 +135,7 @@ func (ms *MinionServer) launchProxyServer() { fmt.Sprintf("--bind-address=%s", bindAddress), fmt.Sprintf("--v=%d", ms.proxyLogV), "--logtostderr=true", - "--resource-container=" + path.Join("/", ms.cgroupRoot, "kube-proxy"), + "--resource-container=" + path.Join("/", ms.mesosCgroup, "kube-proxy"), } if ms.clientConfig.Host != "" { @@ -156,7 +160,7 @@ func (ms *MinionServer) launchExecutorServer() <-chan struct{} { ms.AddExecutorFlags(executorFlags) executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags) - executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.cgroupRoot, "kubelet")) + executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.mesosCgroup, "kubelet")) if ms.cgroupRoot != "" { executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot) } @@ -241,14 +245,23 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error { ms.clientConfig = clientConfig // derive the executor cgroup and use it as: - // - pod container cgroup root (e.g. docker cgroup-parent) + // - pod container cgroup root (e.g. docker cgroup-parent, optionally; see comments below) // - parent of kubelet container // - parent of kube-proxy container - ms.cgroupRoot = findMesosCgroup(ms.cgroupPrefix) + ms.mesosCgroup = findMesosCgroup(ms.cgroupPrefix) + log.Infof("discovered mesos cgroup at %q", ms.mesosCgroup) + + // hack alert, this helps to work around systemd+docker+mesos integration problems + // when docker's cgroup-parent flag is used (!containPodResources = don't use the docker flag) + if ms.containPodResources { + ms.cgroupRoot = ms.mesosCgroup + } + cgroupLogger := log.Infof if ms.cgroupRoot == "" { cgroupLogger = log.Warningf } + cgroupLogger("using cgroup-root %q", ms.cgroupRoot) // run subprocesses until ms.done is closed on return of this function @@ -302,6 +315,9 @@ func termSignalListener(abort <-chan struct{}) <-chan struct{} { func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) { ms.KubeletExecutorServer.AddFlags(fs) + + // hack to forward log verbosity flag to the executor + fs.Int32Var(&ms.logVerbosity, "v", ms.logVerbosity, "log level for V logs") } func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) { @@ -309,6 +325,7 @@ func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) { fs.StringVar(&ms.cgroupPrefix, "mesos-cgroup-prefix", ms.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only). Experimental, not yet compatible with k8s volumes.") fs.StringVar(&ms.pathOverride, "path-override", ms.pathOverride, "Override the PATH in the environment of the sub-processes.") + fs.BoolVar(&ms.containPodResources, "contain-pod-resources", ms.containPodResources, "Allocate pod CPU and memory resources from offers and reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.") // log file flags fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation") diff --git a/contrib/mesos/pkg/scheduler/fcfs.go b/contrib/mesos/pkg/scheduler/fcfs.go index 76553615a8a..db84ebd96d6 100644 --- a/contrib/mesos/pkg/scheduler/fcfs.go +++ b/contrib/mesos/pkg/scheduler/fcfs.go @@ -24,8 +24,42 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" ) +type allocationStrategy struct { + fitPredicate podtask.FitPredicate + procurement podtask.Procurement +} + +func (a *allocationStrategy) FitPredicate() podtask.FitPredicate { + return a.fitPredicate +} + +func (a *allocationStrategy) Procurement() podtask.Procurement { + return a.procurement +} + +func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtask.Procurement) AllocationStrategy { + if fitPredicate == nil { + panic("fitPredicate is required") + } + if procurement == nil { + panic("procurement is required") + } + return &allocationStrategy{ + fitPredicate: fitPredicate, + procurement: procurement, + } +} + +type fcfsPodScheduler struct { + AllocationStrategy +} + +func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler { + return &fcfsPodScheduler{as} +} + // A first-come-first-serve scheduler: acquires the first offer that can support the task -func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) { +func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) { podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name) var acceptedOffer offers.Perishable err := r.Walk(func(p offers.Perishable) (bool, error) { @@ -33,7 +67,7 @@ func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (of if offer == nil { return false, fmt.Errorf("nil offer while scheduling task %v", task.ID) } - if task.AcceptOffer(offer) { + if fps.FitPredicate()(task, offer) { if p.Acquire() { acceptedOffer = p log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue()) diff --git a/contrib/mesos/pkg/scheduler/mock_test.go b/contrib/mesos/pkg/scheduler/mock_test.go index e53024ec2cc..17930a86813 100644 --- a/contrib/mesos/pkg/scheduler/mock_test.go +++ b/contrib/mesos/pkg/scheduler/mock_test.go @@ -42,11 +42,11 @@ func (m *MockScheduler) slaveFor(id string) (slave *Slave, ok bool) { ok = args.Bool(1) return } -func (m *MockScheduler) algorithm() (f PodScheduleFunc) { +func (m *MockScheduler) algorithm() (f PodScheduler) { args := m.Called() x := args.Get(0) if x != nil { - f = x.(PodScheduleFunc) + f = x.(PodScheduler) } return } diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 56cc419abf6..e7f39cff4ed 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/runtime" annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -56,8 +55,9 @@ const ( // scheduler abstraction to allow for easier unit testing type schedulerInterface interface { sync.Locker // synchronize scheduler plugin operations + SlaveIndex - algorithm() PodScheduleFunc // see types.go + algorithm() PodScheduler offers() offers.Registry tasks() podtask.Registry @@ -76,8 +76,8 @@ type k8smScheduler struct { internal *KubernetesScheduler } -func (k *k8smScheduler) algorithm() PodScheduleFunc { - return k.internal.scheduleFunc +func (k *k8smScheduler) algorithm() PodScheduler { + return k.internal } func (k *k8smScheduler) offers() offers.Registry { @@ -231,10 +231,8 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod } type kubeScheduler struct { - api schedulerInterface - podUpdates queue.FIFO - defaultContainerCPULimit mresource.CPUShares - defaultContainerMemLimit mresource.MegaBytes + api schedulerInterface + podUpdates queue.FIFO } // recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching @@ -318,7 +316,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } } if err == nil && offer == nil { - offer, err = k.api.algorithm()(k.api.offers(), k.api, task) + offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task) } if err != nil { return "", err @@ -338,18 +336,8 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) } - // write resource limits into the pod spec which is transferred to the executor. From here - // on we can expect that the pod spec of a task has proper limits for CPU and memory. - // TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver - if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU { - log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod)) - } - if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem { - log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod)) - } - task.Offer = offer - task.FillFromDetails(details) + k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? if err := k.api.tasks().Update(task); err != nil { offer.Release() @@ -556,7 +544,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error) defer k.api.Unlock() switch task, state := k.api.tasks().Get(task.ID); state { case podtask.StatePending: - return !task.Has(podtask.Launched) && task.AcceptOffer(offer) + return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer) default: // no point in continuing to check for matching offers return true @@ -698,10 +686,8 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht Config: &plugin.Config{ MinionLister: nil, Algorithm: &kubeScheduler{ - api: kapi, - podUpdates: podUpdates, - defaultContainerCPULimit: k.defaultContainerCPULimit, - defaultContainerMemLimit: k.defaultContainerMemLimit, + api: kapi, + podUpdates: podUpdates, }, Binder: &binder{api: kapi}, NextPod: q.yield, diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 4522a633c55..06da09d5c55 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -393,13 +393,14 @@ func TestPlugin_LifeCycle(t *testing.T) { executor.Data = []byte{0, 1, 2} // create scheduler + as := NewAllocationStrategy( + podtask.DefaultPredicate, + podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)) testScheduler := New(Config{ - Executor: executor, - Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), - ScheduleFunc: FCFSScheduleFunc, - Schedcfg: *schedcfg.CreateDefaultConfig(), - DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit, - DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit, + Executor: executor, + Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), + Scheduler: NewFCFSPodScheduler(as), + Schedcfg: *schedcfg.CreateDefaultConfig(), }) assert.NotNil(testScheduler.client, "client is nil") diff --git a/contrib/mesos/pkg/scheduler/podtask/minimal.go b/contrib/mesos/pkg/scheduler/podtask/minimal.go new file mode 100644 index 00000000000..842455e5919 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/podtask/minimal.go @@ -0,0 +1,73 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtask + +import ( + log "github.com/golang/glog" + mesos "github.com/mesos/mesos-go/mesosproto" +) + +// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave +const ( + minimalCpus = 0.01 + minimalMem = 0.25 +) + +var ( + DefaultMinimalPredicate = RequireAllPredicate([]FitPredicate{ + ValidationPredicate, + NodeSelectorPredicate, + MinimalPodResourcesPredicate, + PortsPredicate, + }).Fit + + DefaultMinimalProcurement = AllOrNothingProcurement([]Procurement{ + ValidateProcurement, + NodeProcurement, + MinimalPodResourcesProcurement, + PortsProcurement, + }).Procure +) + +func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool { + var ( + offeredCpus float64 + offeredMem float64 + ) + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + offeredCpus = resource.GetScalar().GetValue() + } + + if resource.GetName() == "mem" { + offeredMem = resource.GetScalar().GetValue() + } + } + log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem) + if (minimalCpus > offeredCpus) || (minimalMem > offeredMem) { + log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem) + return false + } + return true +} + +func MinimalPodResourcesProcurement(t *T, details *mesos.Offer) error { + log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem) + t.Spec.CPU = minimalCpus + t.Spec.Memory = minimalMem + return nil +} diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index 1c08abd7632..ad8a77d42fe 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -18,7 +18,6 @@ package podtask import ( "fmt" - "strings" "time" "github.com/gogo/protobuf/proto" @@ -28,7 +27,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" @@ -150,59 +148,6 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo { return info } -// Fill the Spec in the T, should be called during k8s scheduling, before binding. -func (t *T) FillFromDetails(details *mesos.Offer) error { - if details == nil { - //programming error - panic("offer details are nil") - } - - // compute used resources - cpu := mresource.PodCPULimit(&t.Pod) - mem := mresource.PodMemLimit(&t.Pod) - log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) - - t.Spec = Spec{ - SlaveID: details.GetSlaveId().GetValue(), - AssignedSlave: details.GetHostname(), - CPU: cpu, - Memory: mem, - } - - // fill in port mapping - if mapping, err := t.mapper.Generate(t, details); err != nil { - t.Reset() - return err - } else { - ports := []uint64{} - for _, entry := range mapping { - ports = append(ports, entry.OfferPort) - } - t.Spec.PortMap = mapping - t.Spec.Ports = ports - } - - // hostname needs of the executor needs to match that of the offer, otherwise - // the kubelet node status checker/updater is very unhappy - const HOSTNAME_OVERRIDE_FLAG = "--hostname-override=" - hostname := details.GetHostname() // required field, non-empty - hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname - - argv := t.executor.Command.Arguments - overwrite := false - for i, arg := range argv { - if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) { - overwrite = true - argv[i] = hostnameOverride - break - } - } - if !overwrite { - t.executor.Command.Arguments = append(argv, hostnameOverride) - } - return nil -} - // Clear offer-related details from the task, should be called if/when an offer // has already been assigned to a task but for some reason is no longer valid. func (t *T) Reset() { @@ -211,65 +156,6 @@ func (t *T) Reset() { t.Spec = Spec{} } -func (t *T) AcceptOffer(offer *mesos.Offer) bool { - if offer == nil { - return false - } - - // if the user has specified a target host, make sure this offer is for that host - if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName { - return false - } - - // check the NodeSelector - if len(t.Pod.Spec.NodeSelector) > 0 { - slaveLabels := map[string]string{} - for _, a := range offer.Attributes { - if a.GetType() == mesos.Value_TEXT { - slaveLabels[a.GetName()] = a.GetText().GetValue() - } - } - selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector) - if !selector.Matches(labels.Set(slaveLabels)) { - return false - } - } - - // check ports - if _, err := t.mapper.Generate(t, offer); err != nil { - log.V(3).Info(err) - return false - } - - // find offered cpu and mem - var ( - offeredCpus mresource.CPUShares - offeredMem mresource.MegaBytes - ) - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) - } - - if resource.GetName() == "mem" { - offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) - } - } - - // calculate cpu and mem sum over all containers of the pod - // TODO (@sttts): also support pod.spec.resources.limit.request - // TODO (@sttts): take into account the executor resources - cpu := mresource.PodCPULimit(&t.Pod) - mem := mresource.PodMemLimit(&t.Pod) - log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) - if (cpu > offeredCpus) || (mem > offeredMem) { - log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) - return false - } - - return true -} - func (t *T) Set(f FlagType) { t.Flags[f] = struct{}{} if Launched == f { diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 8bdeb5817ec..9b18e283fae 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) { mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - if ok := task.AcceptOffer(nil); ok { + if ok := DefaultPredicate(task, nil); ok { t.Fatalf("accepted nil offer") } - if ok := task.AcceptOffer(&mesos.Offer{}); ok { + if ok := DefaultPredicate(task, &mesos.Offer{}); ok { t.Fatalf("accepted empty offer") } } @@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", 0.001), }, } - if ok := task.AcceptOffer(offer); ok { + if ok := DefaultPredicate(task, offer); ok { t.Fatalf("accepted offer %v:", offer) } @@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) { mutil.NewScalarResource("mem", t_min_mem), }, } - if ok := task.AcceptOffer(offer); !ok { + if ok := DefaultPredicate(task, offer); !ok { t.Fatalf("did not accepted offer %v:", offer) } } @@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) { rangeResource("ports", []uint64{1, 1}), }, } - if ok := task.AcceptOffer(offer); !ok { + if ok := DefaultPredicate(task, offer); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) { mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) - if ok := task.AcceptOffer(offer); ok { + if ok := DefaultPredicate(task, offer); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := task.AcceptOffer(offer); !ok { + if ok := DefaultPredicate(task, offer); !ok { t.Fatalf("did not accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 0 - if ok := task.AcceptOffer(offer); !ok { + if ok := DefaultPredicate(task, offer); !ok { t.Fatalf("did not accepted offer %v:", offer) } @@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) { mutil.NewScalarResource("cpus", t_min_cpu), mutil.NewScalarResource("mem", t_min_mem), } - if ok := task.AcceptOffer(offer); ok { + if ok := DefaultPredicate(task, offer); ok { t.Fatalf("accepted offer %v:", offer) } pod.Spec.Containers[0].Ports[0].HostPort = 1 - if ok := task.AcceptOffer(offer); ok { + if ok := DefaultPredicate(task, offer); ok { t.Fatalf("accepted offer %v:", offer) } } @@ -297,7 +297,7 @@ func TestNodeSelector(t *testing.T) { }, Attributes: ts.attrs, } - if got, want := task.AcceptOffer(offer), ts.ok; got != want { + if got, want := DefaultPredicate(task, offer), ts.ok; got != want { t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector) } } diff --git a/contrib/mesos/pkg/scheduler/podtask/predicate.go b/contrib/mesos/pkg/scheduler/podtask/predicate.go new file mode 100644 index 00000000000..4b46cbb4900 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/podtask/predicate.go @@ -0,0 +1,110 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtask + +import ( + log "github.com/golang/glog" + mesos "github.com/mesos/mesos-go/mesosproto" + mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" + "k8s.io/kubernetes/pkg/labels" +) + +var DefaultPredicate = RequireAllPredicate([]FitPredicate{ + ValidationPredicate, + NodeSelectorPredicate, + PodFitsResourcesPredicate, + PortsPredicate, +}).Fit + +// FitPredicate implementations determine if the given task "fits" into offered Mesos resources. +// Neither the task or offer should be modified. +type FitPredicate func(*T, *mesos.Offer) bool + +type RequireAllPredicate []FitPredicate + +func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool { + for _, p := range f { + if !p(t, offer) { + return false + } + } + return true +} + +func ValidationPredicate(t *T, offer *mesos.Offer) bool { + return t != nil && offer != nil +} + +func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool { + // if the user has specified a target host, make sure this offer is for that host + if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName { + return false + } + + // check the NodeSelector + if len(t.Pod.Spec.NodeSelector) > 0 { + slaveLabels := map[string]string{} + for _, a := range offer.Attributes { + if a.GetType() == mesos.Value_TEXT { + slaveLabels[a.GetName()] = a.GetText().GetValue() + } + } + selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector) + if !selector.Matches(labels.Set(slaveLabels)) { + return false + } + } + return true +} + +func PortsPredicate(t *T, offer *mesos.Offer) bool { + // check ports + if _, err := t.mapper.Generate(t, offer); err != nil { + log.V(3).Info(err) + return false + } + return true +} + +func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool { + // find offered cpu and mem + var ( + offeredCpus mresource.CPUShares + offeredMem mresource.MegaBytes + ) + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) + } + + if resource.GetName() == "mem" { + offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) + } + } + + // calculate cpu and mem sum over all containers of the pod + // TODO (@sttts): also support pod.spec.resources.limit.request + // TODO (@sttts): take into account the executor resources + cpu := mresource.PodCPULimit(&t.Pod) + mem := mresource.PodMemLimit(&t.Pod) + log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) + if (cpu > offeredCpus) || (mem > offeredMem) { + log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) + return false + } + return true +} diff --git a/contrib/mesos/pkg/scheduler/podtask/procurement.go b/contrib/mesos/pkg/scheduler/podtask/procurement.go new file mode 100644 index 00000000000..566be7b5c6e --- /dev/null +++ b/contrib/mesos/pkg/scheduler/podtask/procurement.go @@ -0,0 +1,152 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtask + +import ( + "strings" + + log "github.com/golang/glog" + mesos "github.com/mesos/mesos-go/mesosproto" + mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" +) + +// NewDefaultProcurement returns the default procurement strategy that combines validation +// and responsible Mesos resource procurement. c and m are resource quantities written into +// k8s api.Pod.Spec's that don't declare resources (all containers in k8s-mesos require cpu +// and memory limits). +func NewDefaultProcurement(c mresource.CPUShares, m mresource.MegaBytes) Procurement { + requireSome := &RequireSomePodResources{ + defaultContainerCPULimit: c, + defaultContainerMemLimit: m, + } + return AllOrNothingProcurement([]Procurement{ + ValidateProcurement, + NodeProcurement, + requireSome.Procure, + PodResourcesProcurement, + PortsProcurement, + }).Procure +} + +// Procurement funcs allocate resources for a task from an offer. +// Both the task and/or offer may be modified. +type Procurement func(*T, *mesos.Offer) error + +// AllOrNothingProcurement provides a convenient wrapper around multiple Procurement +// objectives: the failure of any Procurement in the set results in Procure failing. +// see AllOrNothingProcurement.Procure +type AllOrNothingProcurement []Procurement + +// Procure runs each Procurement in the receiver list. The first Procurement func that +// fails triggers T.Reset() and the error is returned, otherwise returns nil. +func (a AllOrNothingProcurement) Procure(t *T, offer *mesos.Offer) error { + for _, p := range a { + if err := p(t, offer); err != nil { + t.Reset() + return err + } + } + return nil +} + +// ValidateProcurement checks that the offered resources are kosher, and if not panics. +// If things check out ok, t.Spec is cleared and nil is returned. +func ValidateProcurement(t *T, offer *mesos.Offer) error { + if offer == nil { + //programming error + panic("offer details are nil") + } + t.Spec = Spec{} + return nil +} + +// NodeProcurement updates t.Spec in preparation for the task to be launched on the +// slave associated with the offer. +func NodeProcurement(t *T, offer *mesos.Offer) error { + t.Spec.SlaveID = offer.GetSlaveId().GetValue() + t.Spec.AssignedSlave = offer.GetHostname() + + // hostname needs of the executor needs to match that of the offer, otherwise + // the kubelet node status checker/updater is very unhappy + const HOSTNAME_OVERRIDE_FLAG = "--hostname-override=" + hostname := offer.GetHostname() // required field, non-empty + hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname + + argv := t.executor.Command.Arguments + overwrite := false + for i, arg := range argv { + if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) { + overwrite = true + argv[i] = hostnameOverride + break + } + } + if !overwrite { + t.executor.Command.Arguments = append(argv, hostnameOverride) + } + return nil +} + +type RequireSomePodResources struct { + defaultContainerCPULimit mresource.CPUShares + defaultContainerMemLimit mresource.MegaBytes +} + +func (r *RequireSomePodResources) Procure(t *T, offer *mesos.Offer) error { + // write resource limits into the pod spec which is transferred to the executor. From here + // on we can expect that the pod spec of a task has proper limits for CPU and memory. + // TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver + // TODO(jdef): changing the state of t.Pod here feels dirty, especially since we don't use a kosher + // method to clone the api.Pod state in T.Clone(). This needs some love. + if unlimitedCPU := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit); unlimitedCPU { + log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", t.Pod.Namespace, t.Pod.Name, mresource.PodCPULimit(&t.Pod)) + } + if unlimitedMem := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit); unlimitedMem { + log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", t.Pod.Namespace, t.Pod.Name, mresource.PodMemLimit(&t.Pod)) + } + return nil +} + +// PodResourcesProcurement converts k8s pod cpu and memory resource requirements into +// mesos resource allocations. +func PodResourcesProcurement(t *T, offer *mesos.Offer) error { + // compute used resources + cpu := mresource.PodCPULimit(&t.Pod) + mem := mresource.PodMemLimit(&t.Pod) + + log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) + + t.Spec.CPU = cpu + t.Spec.Memory = mem + return nil +} + +// PortsProcurement convert host port mappings into mesos port resource allocations. +func PortsProcurement(t *T, offer *mesos.Offer) error { + // fill in port mapping + if mapping, err := t.mapper.Generate(t, offer); err != nil { + return err + } else { + ports := []uint64{} + for _, entry := range mapping { + ports = append(ports, entry.OfferPort) + } + t.Spec.PortMap = mapping + t.Spec.Ports = ports + } + return nil +} diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 7a2ead5cf6d..0148e0d9368 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -118,19 +117,17 @@ type KubernetesScheduler struct { // and the invoking the pod registry interfaces. // In particular, changes to podtask.T objects are currently guarded by this lock. *sync.RWMutex + PodScheduler // Config related, write-once - schedcfg *schedcfg.Config - executor *mesos.ExecutorInfo - executorGroup uint64 - scheduleFunc PodScheduleFunc - client *client.Client - etcdClient tools.EtcdClient - failoverTimeout float64 // in seconds - reconcileInterval int64 - defaultContainerCPULimit mresource.CPUShares - defaultContainerMemLimit mresource.MegaBytes + schedcfg *schedcfg.Config + executor *mesos.ExecutorInfo + executorGroup uint64 + client *client.Client + etcdClient tools.EtcdClient + failoverTimeout float64 // in seconds + reconcileInterval int64 // Mesos context. @@ -157,33 +154,29 @@ type KubernetesScheduler struct { } type Config struct { - Schedcfg schedcfg.Config - Executor *mesos.ExecutorInfo - ScheduleFunc PodScheduleFunc - Client *client.Client - EtcdClient tools.EtcdClient - FailoverTimeout float64 - ReconcileInterval int64 - ReconcileCooldown time.Duration - DefaultContainerCPULimit mresource.CPUShares - DefaultContainerMemLimit mresource.MegaBytes + Schedcfg schedcfg.Config + Executor *mesos.ExecutorInfo + Scheduler PodScheduler + Client *client.Client + EtcdClient tools.EtcdClient + FailoverTimeout float64 + ReconcileInterval int64 + ReconcileCooldown time.Duration } // New creates a new KubernetesScheduler func New(config Config) *KubernetesScheduler { var k *KubernetesScheduler k = &KubernetesScheduler{ - schedcfg: &config.Schedcfg, - RWMutex: new(sync.RWMutex), - executor: config.Executor, - executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), - scheduleFunc: config.ScheduleFunc, - client: config.Client, - etcdClient: config.EtcdClient, - failoverTimeout: config.FailoverTimeout, - reconcileInterval: config.ReconcileInterval, - defaultContainerCPULimit: config.DefaultContainerCPULimit, - defaultContainerMemLimit: config.DefaultContainerMemLimit, + schedcfg: &config.Schedcfg, + RWMutex: new(sync.RWMutex), + executor: config.Executor, + executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), + PodScheduler: config.Scheduler, + client: config.Client, + etcdClient: config.EtcdClient, + failoverTimeout: config.FailoverTimeout, + reconcileInterval: config.ReconcileInterval, offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { // filter the offers: the executor IDs must not identify a kubelet- diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 541fe47a159..e898fcf72cb 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid" "k8s.io/kubernetes/pkg/api" @@ -139,6 +140,8 @@ type SchedulerServer struct { KubeletNetworkPluginName string StaticPodsConfigPath string DockerCfgPath string + ContainPodResources bool + AccountForPodResources bool executable string // path to the binary running this service client *client.Client @@ -170,18 +173,20 @@ func NewSchedulerServer() *SchedulerServer { MinionLogMaxBackups: minioncfg.DefaultLogMaxBackups, MinionLogMaxAgeInDays: minioncfg.DefaultLogMaxAgeInDays, - MesosAuthProvider: sasl.ProviderName, - MesosCgroupPrefix: minioncfg.DefaultCgroupPrefix, - MesosMaster: defaultMesosMaster, - MesosUser: defaultMesosUser, - ReconcileInterval: defaultReconcileInterval, - ReconcileCooldown: defaultReconcileCooldown, - Checkpoint: true, - FrameworkName: defaultFrameworkName, - HA: false, - mux: http.NewServeMux(), - KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go - KubeletSyncFrequency: 10 * time.Second, + MesosAuthProvider: sasl.ProviderName, + MesosCgroupPrefix: minioncfg.DefaultCgroupPrefix, + MesosMaster: defaultMesosMaster, + MesosUser: defaultMesosUser, + ReconcileInterval: defaultReconcileInterval, + ReconcileCooldown: defaultReconcileCooldown, + Checkpoint: true, + FrameworkName: defaultFrameworkName, + HA: false, + mux: http.NewServeMux(), + KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go + KubeletSyncFrequency: 10 * time.Second, + ContainPodResources: true, + AccountForPodResources: true, } // cache this for later use. also useful in case the original binary gets deleted, e.g. // during upgrades, development deployments, etc. @@ -231,6 +236,8 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.IPVar(&s.ServiceAddress, "service-address", s.ServiceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") + fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.") + fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)") fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") @@ -367,6 +374,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E ci.Arguments = append(ci.Arguments, fmt.Sprintf("--mesos-cgroup-prefix=%v", s.MesosCgroupPrefix)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--contain-pod-resources=%t", s.ContainPodResources)) if s.AuthPath != "" { //TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file @@ -651,17 +659,27 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.Fatalf("misconfigured etcd: %v", err) } + as := scheduler.NewAllocationStrategy( + podtask.DefaultPredicate, + podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit)) + + // downgrade allocation strategy if user disables "account-for-pod-resources" + if !s.AccountForPodResources { + as = scheduler.NewAllocationStrategy( + podtask.DefaultMinimalPredicate, + podtask.DefaultMinimalProcurement) + } + + fcfs := scheduler.NewFCFSPodScheduler(as) mesosPodScheduler := scheduler.New(scheduler.Config{ - Schedcfg: *sc, - Executor: executor, - ScheduleFunc: scheduler.FCFSScheduleFunc, - Client: client, - EtcdClient: etcdClient, - FailoverTimeout: s.FailoverTimeout, - ReconcileInterval: s.ReconcileInterval, - ReconcileCooldown: s.ReconcileCooldown, - DefaultContainerCPULimit: s.DefaultContainerCPULimit, - DefaultContainerMemLimit: s.DefaultContainerMemLimit, + Schedcfg: *sc, + Executor: executor, + Scheduler: fcfs, + Client: client, + EtcdClient: etcdClient, + FailoverTimeout: s.FailoverTimeout, + ReconcileInterval: s.ReconcileInterval, + ReconcileCooldown: s.ReconcileCooldown, }) masterUri := s.MesosMaster diff --git a/contrib/mesos/pkg/scheduler/types.go b/contrib/mesos/pkg/scheduler/types.go index 0b9ba9de017..b529eec318a 100644 --- a/contrib/mesos/pkg/scheduler/types.go +++ b/contrib/mesos/pkg/scheduler/types.go @@ -23,17 +23,29 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" ) -// PodScheduleFunc implements how to schedule pods among slaves. -// We can have different implementation for different scheduling policy. -// -// The Schedule function accepts a group of slaves (each contains offers from -// that slave) and a single pod, which aligns well with the k8s scheduling -// algorithm. It returns an offerId that is acceptable for the pod, otherwise -// nil. The caller is responsible for filling in task state w/ relevant offer -// details. -// -// See the FCFSScheduleFunc for example. -type PodScheduleFunc func(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error) +type AllocationStrategy interface { + // FitPredicate returns the selector used to determine pod fitness w/ respect to a given offer + FitPredicate() podtask.FitPredicate + + // Procurement returns a func that obtains resources for a task from resource offer + Procurement() podtask.Procurement +} + +type PodScheduler interface { + AllocationStrategy + + // SchedulePod implements how to schedule pods among slaves. + // We can have different implementation for different scheduling policy. + // + // The function accepts a group of slaves (each contains offers from + // that slave) and a single pod, which aligns well with the k8s scheduling + // algorithm. It returns an offerId that is acceptable for the pod, otherwise + // nil. The caller is responsible for filling in task state w/ relevant offer + // details. + // + // See the FCFSPodScheduler for example. + SchedulePod(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error) +} // A minimal placeholder type empty struct{} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index a746b633c99..a25cf09afb6 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -1,5 +1,6 @@ accept-hosts accept-paths +account-for-pod-resources admission-control admission-control-config-file advertise-address @@ -43,6 +44,7 @@ cluster-name cluster-tag concurrent-endpoint-syncs configure-cbr0 +contain-pod-resources container-port container-runtime cors-allowed-origins @@ -263,4 +265,4 @@ whitelist-override-label www-prefix retry_time file_content_in_loop -cpu-cfs-quota \ No newline at end of file +cpu-cfs-quota