diff --git a/contrib/mesos/pkg/archive/zip.go b/contrib/mesos/pkg/archive/zip.go index 792ddcbaeb8..60ef52611c4 100644 --- a/contrib/mesos/pkg/archive/zip.go +++ b/contrib/mesos/pkg/archive/zip.go @@ -70,24 +70,24 @@ func ZipWalker(zw *zip.Writer) filepath.WalkFunc { // Create a zip of all files in a directory recursively, return a byte array and // the number of files archived. -func ZipDir(path string) ([]byte, int, error) { +func ZipDir(path string) ([]byte, []string, error) { var buf bytes.Buffer zw := zip.NewWriter(&buf) zipWalker := ZipWalker(zw) - numberManifests := 0 + paths := []string{} err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error { if !info.IsDir() { - numberManifests++ + paths = append(paths, path) } return zipWalker(path, info, err) })) if err != nil { - return nil, 0, err + return nil, nil, err } else if err = zw.Close(); err != nil { - return nil, 0, err + return nil, nil, err } - return buf.Bytes(), numberManifests, nil + return buf.Bytes(), paths, nil } // UnzipDir unzips all files from a given zip byte array into a given directory. diff --git a/contrib/mesos/pkg/executor/config/config.go b/contrib/mesos/pkg/executor/config/config.go index 999058dbc8b..22fc378a6d9 100644 --- a/contrib/mesos/pkg/executor/config/config.go +++ b/contrib/mesos/pkg/executor/config/config.go @@ -26,4 +26,5 @@ const ( DefaultInfoSource = "kubernetes" DefaultInfoName = "Kubelet-Executor" DefaultSuicideTimeout = 20 * time.Minute + DefaultCgroupPrefix = "mesos" ) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 42f81f8d235..66a29a9e31c 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "os/exec" + "path" "path/filepath" "strconv" "strings" @@ -70,6 +71,26 @@ type KubeletExecutorServer struct { SuicideTimeout time.Duration ShutdownFD int ShutdownFIFO string + cgroupRoot string + cgroupPrefix string +} + +func findMesosCgroup(prefix string) string { + // derive our cgroup from MESOS_DIRECTORY environment + mesosDir := os.Getenv("MESOS_DIRECTORY") + if mesosDir == "" { + log.V(2).Infof("cannot derive executor's cgroup because MESOS_DIRECTORY is empty") + return "" + } + + containerId := path.Base(mesosDir) + if containerId == "" { + log.V(2).Infof("cannot derive executor's cgroup from MESOS_DIRECTORY=%q", mesosDir) + return "" + } + trimmedPrefix := strings.Trim(prefix, "/") + cgroupRoot := fmt.Sprintf("/%s/%v", trimmedPrefix, containerId) + return cgroupRoot } func NewKubeletExecutorServer() *KubeletExecutorServer { @@ -79,6 +100,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { ProxyExec: "./kube-proxy", ProxyLogfile: "./proxy-log", SuicideTimeout: config.DefaultSuicideTimeout, + cgroupPrefix: config.DefaultCgroupPrefix, } if pwd, err := os.Getwd(); err != nil { log.Warningf("failed to determine current directory: %v", err) @@ -87,6 +109,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { } k.Address = util.IP(net.ParseIP(defaultBindingAddress())) k.ShutdownFD = -1 // indicates unspecified FD + return k } @@ -112,6 +135,7 @@ func (s *KubeletExecutorServer) addCoreFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.") fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag") fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag") + fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") } func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) { @@ -143,6 +167,14 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { log.Info(err) } + // derive the executor cgroup and use it as docker cgroup root + mesosCgroup := findMesosCgroup(s.cgroupPrefix) + s.cgroupRoot = mesosCgroup + s.SystemContainer = mesosCgroup + s.ResourceContainer = mesosCgroup + log.V(2).Infof("passing cgroup %q to the kubelet as cgroup root", s.CgroupRoot) + + // create apiserver client var apiclient *client.Client clientConfig, err := s.CreateAPIServerClientConfig() if err == nil { @@ -249,7 +281,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, ResourceContainer: s.ResourceContainer, - CgroupRoot: s.CgroupRoot, + CgroupRoot: s.cgroupRoot, ContainerRuntime: s.ContainerRuntime, Mounter: mounter, DockerDaemonContainer: s.DockerDaemonContainer, diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 1fc02e7a000..c0b1f2666ae 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -166,8 +167,8 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e } if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { - log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\"", - task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name) + log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", + task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) if err = b.api.launchTask(task); err == nil { b.api.offers().Invalidate(offerId) task.Set(podtask.Launched) @@ -230,8 +231,10 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod } type kubeScheduler struct { - api schedulerInterface - podUpdates queue.FIFO + api schedulerInterface + podUpdates queue.FIFO + defaultContainerCPULimit mresource.CPUShares + defaultContainerMemLimit mresource.MegaBytes } // Schedule implements the Scheduler interface of Kubernetes. @@ -325,12 +328,20 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { if task.Offer != nil && task.Offer != offer { return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) } + + // write resource limits into the pod spec which is transfered to the executor. From here + // on we can expect that the pod spec of a task has proper limits for CPU and memory. + // TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver + if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU { + log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod)) + } + if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem { + log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod)) + } + task.Offer = offer - //TODO(jdef) FillFromDetails currently allocates fixed (hardwired) cpu and memory resources for all - //tasks. This will be fixed once we properly integrate parent-cgroup support into the kublet-executor. - //For now we are completely ignoring the resources specified in the pod. - //see: https://github.com/mesosphere/kubernetes-mesos/issues/68 task.FillFromDetails(details) + if err := k.api.tasks().Update(task); err != nil { offer.Release() return "", err @@ -678,8 +689,10 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht Config: &plugin.Config{ MinionLister: nil, Algorithm: &kubeScheduler{ - api: kapi, - podUpdates: podUpdates, + api: kapi, + podUpdates: podUpdates, + defaultContainerCPULimit: k.defaultContainerCPULimit, + defaultContainerMemLimit: k.defaultContainerMemLimit, }, Binder: &binder{api: kapi}, NextPod: q.yield, diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 4cf655b5fb2..e4289fda6bd 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -38,6 +38,7 @@ import ( schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" util "github.com/mesos/mesos-go/mesosutil" @@ -388,10 +389,12 @@ func TestPlugin_LifeCycle(t *testing.T) { // create scheduler testScheduler := New(Config{ - Executor: executor, - Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), - ScheduleFunc: FCFSScheduleFunc, - Schedcfg: *schedcfg.CreateDefaultConfig(), + Executor: executor, + Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), + ScheduleFunc: FCFSScheduleFunc, + Schedcfg: *schedcfg.CreateDefaultConfig(), + DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit, + DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit, }) assert.NotNil(testScheduler.client, "client is nil") diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task.go b/contrib/mesos/pkg/scheduler/podtask/pod_task.go index f30dfd8c89f..07833c51afe 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task.go @@ -25,18 +25,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/offers" annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/gogo/protobuf/proto" + log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" ) -const ( - DefaultContainerCpus = 0.25 // initial CPU allocated for executor - DefaultContainerMem = 64 // initial MB of memory allocated for executor -) - type StateType int const ( @@ -75,8 +72,8 @@ type T struct { type Spec struct { SlaveID string - CPU float64 - Memory float64 + CPU mresource.CPUShares + Memory mresource.MegaBytes PortMap []HostPortMapping Ports []uint64 Data []byte @@ -141,8 +138,8 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo { Executor: t.executor, Data: t.Spec.Data, Resources: []*mesos.Resource{ - mutil.NewScalarResource("cpus", t.Spec.CPU), - mutil.NewScalarResource("mem", t.Spec.Memory), + mutil.NewScalarResource("cpus", float64(t.Spec.CPU)), + mutil.NewScalarResource("mem", float64(t.Spec.Memory)), }, } if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil { @@ -151,23 +148,25 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo { return info } -// Fill the Spec in the T, should be called during k8s scheduling, -// before binding. -// TODO(jdef): remove hardcoded values and make use of actual pod resource settings +// Fill the Spec in the T, should be called during k8s scheduling, before binding. func (t *T) FillFromDetails(details *mesos.Offer) error { if details == nil { //programming error panic("offer details are nil") } - log.V(3).Infof("Recording offer(s) %v against pod %v", details.Id, t.Pod.Name) + // compute used resources + cpu := mresource.PodCPULimit(&t.Pod) + mem := mresource.PodMemLimit(&t.Pod) + log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) t.Spec = Spec{ SlaveID: details.GetSlaveId().GetValue(), - CPU: DefaultContainerCpus, - Memory: DefaultContainerMem, + CPU: cpu, + Memory: mem, } + // fill in port mapping if mapping, err := t.mapper.Generate(t, details); err != nil { t.Reset() return err @@ -213,35 +212,39 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool { if offer == nil { return false } - var ( - cpus float64 = 0 - mem float64 = 0 - ) - for _, resource := range offer.Resources { - if resource.GetName() == "cpus" { - cpus = *resource.GetScalar().Value - } - if resource.GetName() == "mem" { - mem = *resource.GetScalar().Value - } - } + // check ports if _, err := t.mapper.Generate(t, offer); err != nil { log.V(3).Info(err) return false } - // for now hard-coded, constant values are used for cpus and mem. This is necessary - // until parent-cgroup integration is finished for mesos and k8sm. Then the k8sm - // executor can become the parent of pods and subsume their resource usage and - // therefore be compliant with expectations of mesos executors w/ respect to - // resource allocation and management. - // - // TODO(jdef): remove hardcoded values and make use of actual pod resource settings - if (cpus < DefaultContainerCpus) || (mem < DefaultContainerMem) { - log.V(3).Infof("not enough resources: cpus: %f mem: %f", cpus, mem) + // find offered cpu and mem + var ( + offeredCpus mresource.CPUShares + offeredMem mresource.MegaBytes + ) + for _, resource := range offer.Resources { + if resource.GetName() == "cpus" { + offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) + } + + if resource.GetName() == "mem" { + offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) + } + } + + // calculate cpu and mem sum over all containers of the pod + // TODO (@sttts): also support pod.spec.resources.limit.request + // TODO (@sttts): take into account the executor resources + cpu := mresource.PodCPULimit(&t.Pod) + mem := mresource.PodMemLimit(&t.Pod) + log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) + if (cpu > offeredCpus) || (mem > offeredMem) { + log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) return false } + return true } diff --git a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go index 02506c5df9a..c24e3f2fbdd 100644 --- a/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/pod_task_test.go @@ -19,9 +19,13 @@ package podtask import ( "testing" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" + + "github.com/stretchr/testify/assert" ) const ( @@ -38,12 +42,109 @@ func fakePodTask(id string) (*T, error) { }, &mesos.ExecutorInfo{}) } +func TestUnlimitedResources(t *testing.T) { + assert := assert.New(t) + + task, _ := fakePodTask("unlimited") + pod := &task.Pod + pod.Spec = api.PodSpec{ + Containers: []api.Container{{ + Name: "a", + Ports: []api.ContainerPort{{ + HostPort: 123, + }}, + Resources: api.ResourceRequirements{ + Limits: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(3, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(768*1024*1024, resource.BinarySI), + }, + }, + }, { + Name: "b", + }, { + Name: "c", + }}, + } + + beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit) + beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit) + + unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit) + unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit) + + cpu := mresource.PodCPULimit(pod) + mem := mresource.PodMemLimit(pod) + + assert.True(unboundedCPU, "CPU resources are defined as unlimited") + assert.True(unboundedMem, "mem resources are defined as unlimited") + + assert.Equal(2*float64(mresource.DefaultDefaultContainerCPULimit)+3.0, float64(cpu)) + assert.Equal(2*float64(mresource.DefaultDefaultContainerMemLimit)+768.0, float64(mem)) + + assert.Equal(cpu, beforeLimitingCPU) + assert.Equal(mem, beforeLimitingMem) +} + +func TestLimitedResources(t *testing.T) { + assert := assert.New(t) + + task, _ := fakePodTask("limited") + pod := &task.Pod + pod.Spec = api.PodSpec{ + Containers: []api.Container{{ + Name: "a", + Resources: api.ResourceRequirements{ + Limits: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(256*1024*1024, resource.BinarySI), + }, + }, + }, { + Name: "b", + Resources: api.ResourceRequirements{ + Limits: api.ResourceList{ + api.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(512*1024*1024, resource.BinarySI), + }, + }, + }}, + } + + beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit) + beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit) + + unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit) + unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit) + + cpu := mresource.PodCPULimit(pod) + mem := mresource.PodMemLimit(pod) + + assert.False(unboundedCPU, "CPU resources are defined as limited") + assert.False(unboundedMem, "mem resources are defined as limited") + + assert.Equal(3.0, float64(cpu)) + assert.Equal(768.0, float64(mem)) + + assert.Equal(cpu, beforeLimitingCPU) + assert.Equal(mem, beforeLimitingMem) +} + func TestEmptyOffer(t *testing.T) { t.Parallel() task, err := fakePodTask("foo") if err != nil { t.Fatal(err) } + + task.Pod.Spec = api.PodSpec{ + Containers: []api.Container{{ + Name: "a", + }}, + } + + mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) + mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) + if ok := task.AcceptOffer(nil); ok { t.Fatalf("accepted nil offer") } @@ -59,6 +160,15 @@ func TestNoPortsInPodOrOffer(t *testing.T) { t.Fatal(err) } + task.Pod.Spec = api.PodSpec{ + Containers: []api.Container{{ + Name: "a", + }}, + } + + mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) + mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) + offer := &mesos.Offer{ Resources: []*mesos.Resource{ mutil.NewScalarResource("cpus", 0.001), @@ -103,6 +213,10 @@ func TestAcceptOfferPorts(t *testing.T) { }}, }}, } + + mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) + mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) + if ok := task.AcceptOffer(offer); ok { t.Fatalf("accepted offer %v:", offer) } diff --git a/contrib/mesos/pkg/scheduler/resource/doc.go b/contrib/mesos/pkg/scheduler/resource/doc.go new file mode 100644 index 00000000000..f1595050a3c --- /dev/null +++ b/contrib/mesos/pkg/scheduler/resource/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package resource contains the Mesos scheduler specific resource functions +package resource diff --git a/contrib/mesos/pkg/scheduler/resource/resource.go b/contrib/mesos/pkg/scheduler/resource/resource.go new file mode 100644 index 00000000000..2104c3dfcb5 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/resource/resource.go @@ -0,0 +1,105 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resource + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" +) + +const ( + DefaultDefaultContainerCPULimit = CPUShares(0.25) // CPUs allocated for pods without CPU limit + DefaultDefaultContainerMemLimit = MegaBytes(64.0) // memory allocated for pods without memory limit +) + +// CPUFromPodSpec computes the cpu shares that the pod is admitted to use. Containers +// without CPU limit are NOT taken into account. +func PodCPULimit(pod *api.Pod) CPUShares { + cpuQuantity := resourcequota.PodCPU(pod) + return CPUShares(float64(cpuQuantity.MilliValue()) / 1000.0) +} + +// MemFromPodSpec computes the amount of memory that the pod is admitted to use. Containers +// without memory limit are NOT taken into account. +func PodMemLimit(pod *api.Pod) MegaBytes { + memQuantity := resourcequota.PodMemory(pod) + return MegaBytes(float64(memQuantity.Value()) / 1024.0 / 1024.0) +} + +// limitPodResource sets the given default resource limit for each container that +// does not limit the given resource yet. limitPodResource returns true iff at least one +// container had no limit for that resource. +func limitPodResource(pod *api.Pod, resourceName api.ResourceName, defaultLimit resource.Quantity) bool { + unlimited := false + for j := range pod.Spec.Containers { + container := &pod.Spec.Containers[j] + if container.Resources.Limits == nil { + container.Resources.Limits = api.ResourceList{} + } + _, ok := container.Resources.Limits[resourceName] + if !ok { + container.Resources.Limits[resourceName] = defaultLimit + unlimited = true + } + } + return unlimited +} + +// unlimitedPodResources counts how many containers in the pod have no limit for the given resource +func unlimitedCountainerNum(pod *api.Pod, resourceName api.ResourceName) int { + unlimited := 0 + for j := range pod.Spec.Containers { + container := &pod.Spec.Containers[j] + + if container.Resources.Limits == nil { + unlimited += 1 + continue + } + + if _, ok := container.Resources.Limits[resourceName]; !ok { + unlimited += 1 + } + } + return unlimited +} + +// limitPodCPU sets DefaultContainerCPUs for the CPU limit of each container that +// does not limit its CPU resource yet. limitPodCPU returns true iff at least one +// container had no CPU limit set. +func LimitPodCPU(pod *api.Pod, defaultLimit CPUShares) bool { + defaultCPUQuantity := resource.NewMilliQuantity(int64(float64(defaultLimit)*1000.0), resource.DecimalSI) + return limitPodResource(pod, api.ResourceCPU, *defaultCPUQuantity) +} + +// limitPodMem sets DefaultContainerMem for the memory limit of each container that +// does not limit its memory resource yet. limitPodMem returns true iff at least one +// container had no memory limit set. +func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) bool { + defaultMemQuantity := resource.NewQuantity(int64(float64(defaultLimit)*1024.0*1024.0), resource.BinarySI) + return limitPodResource(pod, api.ResourceMemory, *defaultMemQuantity) +} + +// CPUForPod computes the limits from the spec plus the default CPU limit for unlimited containers +func CPUForPod(pod *api.Pod, defaultLimit CPUShares) CPUShares { + return PodCPULimit(pod) + CPUShares(unlimitedCountainerNum(pod, api.ResourceCPU))*defaultLimit +} + +// MemForPod computes the limits from the spec plus the default memory limit for unlimited containers +func MemForPod(pod *api.Pod, defaultLimit MegaBytes) MegaBytes { + return PodMemLimit(pod) + MegaBytes(unlimitedCountainerNum(pod, api.ResourceMemory))*defaultLimit +} diff --git a/contrib/mesos/pkg/scheduler/resource/types.go b/contrib/mesos/pkg/scheduler/resource/types.go new file mode 100644 index 00000000000..b59d435d5e2 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/resource/types.go @@ -0,0 +1,49 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resource + +import ( + "fmt" + "strconv" +) + +type MegaBytes float64 +type CPUShares float64 + +func (f *CPUShares) Set(s string) error { + v, err := strconv.ParseFloat(s, 64) + *f = CPUShares(v) + return err +} + +func (f *CPUShares) Type() string { + return "float64" +} + +func (f *CPUShares) String() string { return fmt.Sprintf("%v", *f) } + +func (f *MegaBytes) Set(s string) error { + v, err := strconv.ParseFloat(s, 64) + *f = MegaBytes(v) + return err +} + +func (f *MegaBytes) Type() string { + return "float64" +} + +func (f *MegaBytes) String() string { return fmt.Sprintf("%v", *f) } diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index a62278e52ec..01ce2e60e43 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -35,6 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -120,14 +121,16 @@ type KubernetesScheduler struct { // Config related, write-once - schedcfg *schedcfg.Config - executor *mesos.ExecutorInfo - executorGroup uint64 - scheduleFunc PodScheduleFunc - client *client.Client - etcdClient tools.EtcdClient - failoverTimeout float64 // in seconds - reconcileInterval int64 + schedcfg *schedcfg.Config + executor *mesos.ExecutorInfo + executorGroup uint64 + scheduleFunc PodScheduleFunc + client *client.Client + etcdClient tools.EtcdClient + failoverTimeout float64 // in seconds + reconcileInterval int64 + defaultContainerCPULimit mresource.CPUShares + defaultContainerMemLimit mresource.MegaBytes // Mesos context. @@ -154,29 +157,33 @@ type KubernetesScheduler struct { } type Config struct { - Schedcfg schedcfg.Config - Executor *mesos.ExecutorInfo - ScheduleFunc PodScheduleFunc - Client *client.Client - EtcdClient tools.EtcdClient - FailoverTimeout float64 - ReconcileInterval int64 - ReconcileCooldown time.Duration + Schedcfg schedcfg.Config + Executor *mesos.ExecutorInfo + ScheduleFunc PodScheduleFunc + Client *client.Client + EtcdClient tools.EtcdClient + FailoverTimeout float64 + ReconcileInterval int64 + ReconcileCooldown time.Duration + DefaultContainerCPULimit mresource.CPUShares + DefaultContainerMemLimit mresource.MegaBytes } // New creates a new KubernetesScheduler func New(config Config) *KubernetesScheduler { var k *KubernetesScheduler k = &KubernetesScheduler{ - schedcfg: &config.Schedcfg, - RWMutex: new(sync.RWMutex), - executor: config.Executor, - executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), - scheduleFunc: config.ScheduleFunc, - client: config.Client, - etcdClient: config.EtcdClient, - failoverTimeout: config.FailoverTimeout, - reconcileInterval: config.ReconcileInterval, + schedcfg: &config.Schedcfg, + RWMutex: new(sync.RWMutex), + executor: config.Executor, + executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), + scheduleFunc: config.ScheduleFunc, + client: config.Client, + etcdClient: config.EtcdClient, + failoverTimeout: config.FailoverTimeout, + reconcileInterval: config.ReconcileInterval, + defaultContainerCPULimit: config.DefaultContainerCPULimit, + defaultContainerMemLimit: config.DefaultContainerMemLimit, offers: offers.CreateRegistry(offers.RegistryConfig{ Compat: func(o *mesos.Offer) bool { // filter the offers: the executor IDs must not identify a kubelet- diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index c9aab079493..9fc8b9e3ffa 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -18,6 +18,7 @@ package service import ( "bufio" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -42,8 +43,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" - "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" + mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -70,6 +72,9 @@ const ( defaultReconcileInterval = 300 // 5m default task reconciliation interval defaultReconcileCooldown = 15 * time.Second defaultFrameworkName = "Kubernetes" + + executorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor + executorMem = mresource.MegaBytes(64.0) // initial memory allocated for executor ) type SchedulerServer struct { @@ -95,11 +100,14 @@ type SchedulerServer struct { ExecutorProxyBindall bool ExecutorLogV int ExecutorSuicideTimeout time.Duration + ExecutorCgroupPrefix string MesosAuthProvider string DriverPort uint HostnameOverride string ReconcileInterval int64 ReconcileCooldown time.Duration + DefaultContainerCPULimit mresource.CPUShares + DefaultContainerMemLimit mresource.MegaBytes SchedulerConfigFileName string Graceful bool FrameworkName string @@ -142,6 +150,7 @@ func NewSchedulerServer() *SchedulerServer { FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(), ExecutorRunProxy: true, ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout, + ExecutorCgroupPrefix: execcfg.DefaultCgroupPrefix, MesosAuthProvider: sasl.ProviderName, MesosMaster: defaultMesosMaster, MesosUser: defaultMesosUser, @@ -198,12 +207,15 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.StringVar(&s.FrameworkWebURI, "framework-weburi", s.FrameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.") fs.StringVar(&s.AdvertisedAddress, "advertised-address", s.AdvertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.") fs.Var(&s.ServiceAddress, "service-address", "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") + fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") + fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.") fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.") fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.") fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.") + fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.") fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.") @@ -322,6 +334,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E ci.Arguments = append(ci.Arguments, "--address=0.0.0.0") } + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) @@ -351,38 +364,69 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E log.V(1).Infof("prepared executor command %q with args '%+v'", ci.GetValue(), ci.Arguments) // Create mesos scheduler driver. - info := &mesos.ExecutorInfo{ + execInfo := &mesos.ExecutorInfo{ Command: ci, Name: proto.String(execcfg.DefaultInfoName), Source: proto.String(execcfg.DefaultInfoSource), } // Check for staticPods + var staticPodCPUs, staticPodMem float64 if s.StaticPodsConfigPath != "" { - bs, numberStaticPods, err := archive.ZipDir(s.StaticPodsConfigPath) + bs, paths, err := archive.ZipDir(s.StaticPodsConfigPath) if err != nil { return nil, nil, err } - info.Data = bs - // Adjust the resource accounting for the executor. - // Currently each podTask accounts the default amount of resources. - // TODO(joerg84) adapt to actual resources specified by pods. - log.Infof("Detected %d staticPods in Configuration.", numberStaticPods) + // try to read pod files and sum resources + // TODO(sttts): don't terminate when static pods are broken, but skip them + // TODO(sttts): add a directory watch and tell running executors about updates + for _, podPath := range paths { + podJson, err := ioutil.ReadFile(podPath) + if err != nil { + return nil, nil, fmt.Errorf("error reading static pod spec: %v", err) + } - info.Resources = []*mesos.Resource{ - mutil.NewScalarResource("cpus", float64(numberStaticPods)*podtask.DefaultContainerCpus), - mutil.NewScalarResource("mem", float64(numberStaticPods)*podtask.DefaultContainerMem), + pod := api.Pod{} + err = json.Unmarshal(podJson, &pod) + if err != nil { + return nil, nil, fmt.Errorf("error parsing static pod spec at %v: %v", podPath, err) + } + + // TODO(sttts): allow unlimited static pods as well and patch in the default resource limits + unlimitedCPU := mresource.LimitPodCPU(&pod, s.DefaultContainerCPULimit) + unlimitedMem := mresource.LimitPodMem(&pod, s.DefaultContainerMemLimit) + if unlimitedCPU { + return nil, nil, fmt.Errorf("found static pod without limit on cpu resources: %v", podPath) + } + if unlimitedMem { + return nil, nil, fmt.Errorf("found static pod without limit on memory resources: %v", podPath) + } + + cpu := mresource.PodCPULimit(&pod) + mem := mresource.PodMemLimit(&pod) + log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s", cpu, mem, pod.Name) + + staticPodCPUs += float64(cpu) + staticPodMem += float64(mem) } + + // pass zipped pod spec to executor + execInfo.Data = bs + } + + execInfo.Resources = []*mesos.Resource{ + mutil.NewScalarResource("cpus", float64(executorCPUs)+staticPodCPUs), + mutil.NewScalarResource("mem", float64(executorMem)+staticPodMem), } // calculate ExecutorInfo hash to be used for validating compatibility // of ExecutorInfo's generated by other HA schedulers. - ehash := hashExecutorInfo(info) + ehash := hashExecutorInfo(execInfo) eid := uid.New(ehash, execcfg.DefaultInfoID) - info.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())} + execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())} - return info, eid, nil + return execInfo, eid, nil } // TODO(jdef): hacked from kubelet/server/server.go @@ -580,14 +624,16 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } mesosPodScheduler := scheduler.New(scheduler.Config{ - Schedcfg: *sc, - Executor: executor, - ScheduleFunc: scheduler.FCFSScheduleFunc, - Client: client, - EtcdClient: etcdClient, - FailoverTimeout: s.FailoverTimeout, - ReconcileInterval: s.ReconcileInterval, - ReconcileCooldown: s.ReconcileCooldown, + Schedcfg: *sc, + Executor: executor, + ScheduleFunc: scheduler.FCFSScheduleFunc, + Client: client, + EtcdClient: etcdClient, + FailoverTimeout: s.FailoverTimeout, + ReconcileInterval: s.ReconcileInterval, + ReconcileCooldown: s.ReconcileCooldown, + DefaultContainerCPULimit: s.DefaultContainerCPULimit, + DefaultContainerMemLimit: s.DefaultContainerMemLimit, }) masterUri := s.MesosMaster diff --git a/contrib/mesos/pkg/scheduler/service/service_test.go b/contrib/mesos/pkg/scheduler/service/service_test.go index 7f4ff2656c1..e329e893341 100644 --- a/contrib/mesos/pkg/scheduler/service/service_test.go +++ b/contrib/mesos/pkg/scheduler/service/service_test.go @@ -137,9 +137,9 @@ func Test_StaticPods(t *testing.T) { assert.NoError(err) // archive config files - data, fileNum, err := archive.ZipDir(staticPodsConfigPath) + data, paths, err := archive.ZipDir(staticPodsConfigPath) assert.NoError(err) - assert.Equal(2, fileNum) + assert.Equal(2, len(paths)) // unarchive config files zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))