Flexible resource accounting and pod resource containment:

- new: introduce AllocationStrategy, Predicate, and Procurement to scheduler pkg
- new: --contain-pod-resources flag (workaround for docker+systemd+mesos problems)
- new: --account-for-pod-resources flag (for testing overcommitment)
- bugfix: forward -v flag from minion controller to executor
This commit is contained in:
James DeFelice 2015-08-31 23:06:59 +00:00
parent c28b68d254
commit a1cea8dd87
14 changed files with 517 additions and 233 deletions

View File

@ -55,13 +55,16 @@ type MinionServer struct {
kmBinary string
tasks []*tasks.Task
pathOverride string // the PATH environment for the sub-processes
cgroupPrefix string // e.g. mesos
cgroupRoot string // e.g. /mesos/{container-id}, determined at runtime
pathOverride string // the PATH environment for the sub-processes
cgroupPrefix string // e.g. mesos
cgroupRoot string // the cgroupRoot that we pass to the kubelet-executor, depends on containPodResources
mesosCgroup string // discovered mesos cgroup root, e.g. /mesos/{container-id}
containPodResources bool
logMaxSize resource.Quantity
logMaxBackups int
logMaxAgeInDays int
logVerbosity int32 // see glog.Level
runProxy bool
proxyLogV int
@ -74,6 +77,7 @@ func NewMinionServer() *MinionServer {
KubeletExecutorServer: exservice.NewKubeletExecutorServer(),
privateMountNS: false, // disabled until Docker supports customization of the parent mount namespace
cgroupPrefix: config.DefaultCgroupPrefix,
containPodResources: true,
logMaxSize: config.DefaultLogMaxSize(),
logMaxBackups: config.DefaultLogMaxBackups,
logMaxAgeInDays: config.DefaultLogMaxAgeInDays,
@ -131,7 +135,7 @@ func (ms *MinionServer) launchProxyServer() {
fmt.Sprintf("--bind-address=%s", bindAddress),
fmt.Sprintf("--v=%d", ms.proxyLogV),
"--logtostderr=true",
"--resource-container=" + path.Join("/", ms.cgroupRoot, "kube-proxy"),
"--resource-container=" + path.Join("/", ms.mesosCgroup, "kube-proxy"),
}
if ms.clientConfig.Host != "" {
@ -156,7 +160,7 @@ func (ms *MinionServer) launchExecutorServer() <-chan struct{} {
ms.AddExecutorFlags(executorFlags)
executorArgs, _ := filterArgsByFlagSet(allArgs, executorFlags)
executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.cgroupRoot, "kubelet"))
executorArgs = append(executorArgs, "--resource-container="+path.Join("/", ms.mesosCgroup, "kubelet"))
if ms.cgroupRoot != "" {
executorArgs = append(executorArgs, "--cgroup-root="+ms.cgroupRoot)
}
@ -241,14 +245,23 @@ func (ms *MinionServer) Run(hks hyperkube.Interface, _ []string) error {
ms.clientConfig = clientConfig
// derive the executor cgroup and use it as:
// - pod container cgroup root (e.g. docker cgroup-parent)
// - pod container cgroup root (e.g. docker cgroup-parent, optionally; see comments below)
// - parent of kubelet container
// - parent of kube-proxy container
ms.cgroupRoot = findMesosCgroup(ms.cgroupPrefix)
ms.mesosCgroup = findMesosCgroup(ms.cgroupPrefix)
log.Infof("discovered mesos cgroup at %q", ms.mesosCgroup)
// hack alert, this helps to work around systemd+docker+mesos integration problems
// when docker's cgroup-parent flag is used (!containPodResources = don't use the docker flag)
if ms.containPodResources {
ms.cgroupRoot = ms.mesosCgroup
}
cgroupLogger := log.Infof
if ms.cgroupRoot == "" {
cgroupLogger = log.Warningf
}
cgroupLogger("using cgroup-root %q", ms.cgroupRoot)
// run subprocesses until ms.done is closed on return of this function
@ -302,6 +315,9 @@ func termSignalListener(abort <-chan struct{}) <-chan struct{} {
func (ms *MinionServer) AddExecutorFlags(fs *pflag.FlagSet) {
ms.KubeletExecutorServer.AddFlags(fs)
// hack to forward log verbosity flag to the executor
fs.Int32Var(&ms.logVerbosity, "v", ms.logVerbosity, "log level for V logs")
}
func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) {
@ -309,6 +325,7 @@ func (ms *MinionServer) AddMinionFlags(fs *pflag.FlagSet) {
fs.StringVar(&ms.cgroupPrefix, "mesos-cgroup-prefix", ms.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
fs.BoolVar(&ms.privateMountNS, "private-mountns", ms.privateMountNS, "Enter a private mount NS before spawning procs (linux only). Experimental, not yet compatible with k8s volumes.")
fs.StringVar(&ms.pathOverride, "path-override", ms.pathOverride, "Override the PATH in the environment of the sub-processes.")
fs.BoolVar(&ms.containPodResources, "contain-pod-resources", ms.containPodResources, "Allocate pod CPU and memory resources from offers and reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
// log file flags
fs.Var(resource.NewQuantityFlagValue(&ms.logMaxSize), "max-log-size", "Maximum log file size for the executor and proxy before rotation")

View File

@ -24,8 +24,42 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
type allocationStrategy struct {
fitPredicate podtask.FitPredicate
procurement podtask.Procurement
}
func (a *allocationStrategy) FitPredicate() podtask.FitPredicate {
return a.fitPredicate
}
func (a *allocationStrategy) Procurement() podtask.Procurement {
return a.procurement
}
func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtask.Procurement) AllocationStrategy {
if fitPredicate == nil {
panic("fitPredicate is required")
}
if procurement == nil {
panic("procurement is required")
}
return &allocationStrategy{
fitPredicate: fitPredicate,
procurement: procurement,
}
}
type fcfsPodScheduler struct {
AllocationStrategy
}
func NewFCFSPodScheduler(as AllocationStrategy) PodScheduler {
return &fcfsPodScheduler{as}
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task
func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) {
func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, task *podtask.T) (offers.Perishable, error) {
podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name)
var acceptedOffer offers.Perishable
err := r.Walk(func(p offers.Perishable) (bool, error) {
@ -33,7 +67,7 @@ func FCFSScheduleFunc(r offers.Registry, unused SlaveIndex, task *podtask.T) (of
if offer == nil {
return false, fmt.Errorf("nil offer while scheduling task %v", task.ID)
}
if task.AcceptOffer(offer) {
if fps.FitPredicate()(task, offer) {
if p.Acquire() {
acceptedOffer = p
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())

View File

@ -42,11 +42,11 @@ func (m *MockScheduler) slaveFor(id string) (slave *Slave, ok bool) {
ok = args.Bool(1)
return
}
func (m *MockScheduler) algorithm() (f PodScheduleFunc) {
func (m *MockScheduler) algorithm() (f PodScheduler) {
args := m.Called()
x := args.Get(0)
if x != nil {
f = x.(PodScheduleFunc)
f = x.(PodScheduler)
}
return
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -56,8 +55,9 @@ const (
// scheduler abstraction to allow for easier unit testing
type schedulerInterface interface {
sync.Locker // synchronize scheduler plugin operations
SlaveIndex
algorithm() PodScheduleFunc // see types.go
algorithm() PodScheduler
offers() offers.Registry
tasks() podtask.Registry
@ -76,8 +76,8 @@ type k8smScheduler struct {
internal *KubernetesScheduler
}
func (k *k8smScheduler) algorithm() PodScheduleFunc {
return k.internal.scheduleFunc
func (k *k8smScheduler) algorithm() PodScheduler {
return k.internal
}
func (k *k8smScheduler) offers() offers.Registry {
@ -231,10 +231,8 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
}
type kubeScheduler struct {
api schedulerInterface
podUpdates queue.FIFO
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
api schedulerInterface
podUpdates queue.FIFO
}
// recoverAssignedSlave recovers the assigned Mesos slave from a pod by searching
@ -318,7 +316,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
}
}
if err == nil && offer == nil {
offer, err = k.api.algorithm()(k.api.offers(), k.api, task)
offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
}
if err != nil {
return "", err
@ -338,18 +336,8 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
}
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU {
log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod))
}
if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem {
log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod))
}
task.Offer = offer
task.FillFromDetails(details)
k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if err := k.api.tasks().Update(task); err != nil {
offer.Release()
@ -556,7 +544,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
defer k.api.Unlock()
switch task, state := k.api.tasks().Get(task.ID); state {
case podtask.StatePending:
return !task.Has(podtask.Launched) && task.AcceptOffer(offer)
return !task.Has(podtask.Launched) && k.api.algorithm().FitPredicate()(task, offer)
default:
// no point in continuing to check for matching offers
return true
@ -698,10 +686,8 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht
Config: &plugin.Config{
MinionLister: nil,
Algorithm: &kubeScheduler{
api: kapi,
podUpdates: podUpdates,
defaultContainerCPULimit: k.defaultContainerCPULimit,
defaultContainerMemLimit: k.defaultContainerMemLimit,
api: kapi,
podUpdates: podUpdates,
},
Binder: &binder{api: kapi},
NextPod: q.yield,

View File

@ -393,13 +393,14 @@ func TestPlugin_LifeCycle(t *testing.T) {
executor.Data = []byte{0, 1, 2}
// create scheduler
as := NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit))
testScheduler := New(Config{
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
ScheduleFunc: FCFSScheduleFunc,
Schedcfg: *schedcfg.CreateDefaultConfig(),
DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit,
DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit,
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
Scheduler: NewFCFSPodScheduler(as),
Schedcfg: *schedcfg.CreateDefaultConfig(),
})
assert.NotNil(testScheduler.client, "client is nil")

View File

@ -0,0 +1,73 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
)
// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave
const (
minimalCpus = 0.01
minimalMem = 0.25
)
var (
DefaultMinimalPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
MinimalPodResourcesPredicate,
PortsPredicate,
}).Fit
DefaultMinimalProcurement = AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
MinimalPodResourcesProcurement,
PortsProcurement,
}).Procure
)
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer) bool {
var (
offeredCpus float64
offeredMem float64
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = resource.GetScalar().GetValue()
}
if resource.GetName() == "mem" {
offeredMem = resource.GetScalar().GetValue()
}
}
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
if (minimalCpus > offeredCpus) || (minimalMem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
return false
}
return true
}
func MinimalPodResourcesProcurement(t *T, details *mesos.Offer) error {
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
t.Spec.CPU = minimalCpus
t.Spec.Memory = minimalMem
return nil
}

View File

@ -18,7 +18,6 @@ package podtask
import (
"fmt"
"strings"
"time"
"github.com/gogo/protobuf/proto"
@ -28,7 +27,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
@ -150,59 +148,6 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo {
return info
}
// Fill the Spec in the T, should be called during k8s scheduling, before binding.
func (t *T) FillFromDetails(details *mesos.Offer) error {
if details == nil {
//programming error
panic("offer details are nil")
}
// compute used resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec = Spec{
SlaveID: details.GetSlaveId().GetValue(),
AssignedSlave: details.GetHostname(),
CPU: cpu,
Memory: mem,
}
// fill in port mapping
if mapping, err := t.mapper.Generate(t, details); err != nil {
t.Reset()
return err
} else {
ports := []uint64{}
for _, entry := range mapping {
ports = append(ports, entry.OfferPort)
}
t.Spec.PortMap = mapping
t.Spec.Ports = ports
}
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
const HOSTNAME_OVERRIDE_FLAG = "--hostname-override="
hostname := details.GetHostname() // required field, non-empty
hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname
argv := t.executor.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) {
overwrite = true
argv[i] = hostnameOverride
break
}
}
if !overwrite {
t.executor.Command.Arguments = append(argv, hostnameOverride)
}
return nil
}
// Clear offer-related details from the task, should be called if/when an offer
// has already been assigned to a task but for some reason is no longer valid.
func (t *T) Reset() {
@ -211,65 +156,6 @@ func (t *T) Reset() {
t.Spec = Spec{}
}
func (t *T) AcceptOffer(offer *mesos.Offer) bool {
if offer == nil {
return false
}
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
}
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
return false
}
}
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
return false
}
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}
func (t *T) Set(f FlagType) {
t.Flags[f] = struct{}{}
if Launched == f {

View File

@ -146,10 +146,10 @@ func TestEmptyOffer(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := task.AcceptOffer(nil); ok {
if ok := DefaultPredicate(task, nil); ok {
t.Fatalf("accepted nil offer")
}
if ok := task.AcceptOffer(&mesos.Offer{}); ok {
if ok := DefaultPredicate(task, &mesos.Offer{}); ok {
t.Fatalf("accepted empty offer")
}
}
@ -176,7 +176,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001),
},
}
if ok := task.AcceptOffer(offer); ok {
if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer)
}
@ -186,7 +186,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
}
if ok := task.AcceptOffer(offer); !ok {
if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
}
@ -203,7 +203,7 @@ func TestAcceptOfferPorts(t *testing.T) {
rangeResource("ports", []uint64{1, 1}),
},
}
if ok := task.AcceptOffer(offer); !ok {
if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -218,17 +218,17 @@ func TestAcceptOfferPorts(t *testing.T) {
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
if ok := task.AcceptOffer(offer); ok {
if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := task.AcceptOffer(offer); !ok {
if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := task.AcceptOffer(offer); !ok {
if ok := DefaultPredicate(task, offer); !ok {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -236,12 +236,12 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
}
if ok := task.AcceptOffer(offer); ok {
if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := task.AcceptOffer(offer); ok {
if ok := DefaultPredicate(task, offer); ok {
t.Fatalf("accepted offer %v:", offer)
}
}
@ -297,7 +297,7 @@ func TestNodeSelector(t *testing.T) {
},
Attributes: ts.attrs,
}
if got, want := task.AcceptOffer(offer), ts.ok; got != want {
if got, want := DefaultPredicate(task, offer), ts.ok; got != want {
t.Fatalf("expected acceptance of offer %v for selector %v to be %v, got %v:", want, got, ts.attrs, ts.selector)
}
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/labels"
)
var DefaultPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
PodFitsResourcesPredicate,
PortsPredicate,
}).Fit
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified.
type FitPredicate func(*T, *mesos.Offer) bool
type RequireAllPredicate []FitPredicate
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer) bool {
for _, p := range f {
if !p(t, offer) {
return false
}
}
return true
}
func ValidationPredicate(t *T, offer *mesos.Offer) bool {
return t != nil && offer != nil
}
func NodeSelectorPredicate(t *T, offer *mesos.Offer) bool {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
slaveLabels := map[string]string{}
for _, a := range offer.Attributes {
if a.GetType() == mesos.Value_TEXT {
slaveLabels[a.GetName()] = a.GetText().GetValue()
}
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(slaveLabels)) {
return false
}
}
return true
}
func PortsPredicate(t *T, offer *mesos.Offer) bool {
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
return false
}
return true
}
func PodFitsResourcesPredicate(t *T, offer *mesos.Offer) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}

View File

@ -0,0 +1,152 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"strings"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
)
// NewDefaultProcurement returns the default procurement strategy that combines validation
// and responsible Mesos resource procurement. c and m are resource quantities written into
// k8s api.Pod.Spec's that don't declare resources (all containers in k8s-mesos require cpu
// and memory limits).
func NewDefaultProcurement(c mresource.CPUShares, m mresource.MegaBytes) Procurement {
requireSome := &RequireSomePodResources{
defaultContainerCPULimit: c,
defaultContainerMemLimit: m,
}
return AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
requireSome.Procure,
PodResourcesProcurement,
PortsProcurement,
}).Procure
}
// Procurement funcs allocate resources for a task from an offer.
// Both the task and/or offer may be modified.
type Procurement func(*T, *mesos.Offer) error
// AllOrNothingProcurement provides a convenient wrapper around multiple Procurement
// objectives: the failure of any Procurement in the set results in Procure failing.
// see AllOrNothingProcurement.Procure
type AllOrNothingProcurement []Procurement
// Procure runs each Procurement in the receiver list. The first Procurement func that
// fails triggers T.Reset() and the error is returned, otherwise returns nil.
func (a AllOrNothingProcurement) Procure(t *T, offer *mesos.Offer) error {
for _, p := range a {
if err := p(t, offer); err != nil {
t.Reset()
return err
}
}
return nil
}
// ValidateProcurement checks that the offered resources are kosher, and if not panics.
// If things check out ok, t.Spec is cleared and nil is returned.
func ValidateProcurement(t *T, offer *mesos.Offer) error {
if offer == nil {
//programming error
panic("offer details are nil")
}
t.Spec = Spec{}
return nil
}
// NodeProcurement updates t.Spec in preparation for the task to be launched on the
// slave associated with the offer.
func NodeProcurement(t *T, offer *mesos.Offer) error {
t.Spec.SlaveID = offer.GetSlaveId().GetValue()
t.Spec.AssignedSlave = offer.GetHostname()
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
const HOSTNAME_OVERRIDE_FLAG = "--hostname-override="
hostname := offer.GetHostname() // required field, non-empty
hostnameOverride := HOSTNAME_OVERRIDE_FLAG + hostname
argv := t.executor.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, HOSTNAME_OVERRIDE_FLAG) {
overwrite = true
argv[i] = hostnameOverride
break
}
}
if !overwrite {
t.executor.Command.Arguments = append(argv, hostnameOverride)
}
return nil
}
type RequireSomePodResources struct {
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
}
func (r *RequireSomePodResources) Procure(t *T, offer *mesos.Offer) error {
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
// TODO(jdef): changing the state of t.Pod here feels dirty, especially since we don't use a kosher
// method to clone the api.Pod state in T.Clone(). This needs some love.
if unlimitedCPU := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit); unlimitedCPU {
log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", t.Pod.Namespace, t.Pod.Name, mresource.PodCPULimit(&t.Pod))
}
if unlimitedMem := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit); unlimitedMem {
log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", t.Pod.Namespace, t.Pod.Name, mresource.PodMemLimit(&t.Pod))
}
return nil
}
// PodResourcesProcurement converts k8s pod cpu and memory resource requirements into
// mesos resource allocations.
func PodResourcesProcurement(t *T, offer *mesos.Offer) error {
// compute used resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec.CPU = cpu
t.Spec.Memory = mem
return nil
}
// PortsProcurement convert host port mappings into mesos port resource allocations.
func PortsProcurement(t *T, offer *mesos.Offer) error {
// fill in port mapping
if mapping, err := t.mapper.Generate(t, offer); err != nil {
return err
} else {
ports := []uint64{}
for _, entry := range mapping {
ports = append(ports, entry.OfferPort)
}
t.Spec.PortMap = mapping
t.Spec.Ports = ports
}
return nil
}

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
@ -118,19 +117,17 @@ type KubernetesScheduler struct {
// and the invoking the pod registry interfaces.
// In particular, changes to podtask.T objects are currently guarded by this lock.
*sync.RWMutex
PodScheduler
// Config related, write-once
schedcfg *schedcfg.Config
executor *mesos.ExecutorInfo
executorGroup uint64
scheduleFunc PodScheduleFunc
client *client.Client
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds
reconcileInterval int64
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
schedcfg *schedcfg.Config
executor *mesos.ExecutorInfo
executorGroup uint64
client *client.Client
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds
reconcileInterval int64
// Mesos context.
@ -157,33 +154,29 @@ type KubernetesScheduler struct {
}
type Config struct {
Schedcfg schedcfg.Config
Executor *mesos.ExecutorInfo
ScheduleFunc PodScheduleFunc
Client *client.Client
EtcdClient tools.EtcdClient
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
DefaultContainerCPULimit mresource.CPUShares
DefaultContainerMemLimit mresource.MegaBytes
Schedcfg schedcfg.Config
Executor *mesos.ExecutorInfo
Scheduler PodScheduler
Client *client.Client
EtcdClient tools.EtcdClient
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration
}
// New creates a new KubernetesScheduler
func New(config Config) *KubernetesScheduler {
var k *KubernetesScheduler
k = &KubernetesScheduler{
schedcfg: &config.Schedcfg,
RWMutex: new(sync.RWMutex),
executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
scheduleFunc: config.ScheduleFunc,
client: config.Client,
etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
defaultContainerCPULimit: config.DefaultContainerCPULimit,
defaultContainerMemLimit: config.DefaultContainerMemLimit,
schedcfg: &config.Schedcfg,
RWMutex: new(sync.RWMutex),
executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
PodScheduler: config.Scheduler,
client: config.Client,
etcdClient: config.EtcdClient,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// filter the offers: the executor IDs must not identify a kubelet-

View File

@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
@ -139,6 +140,8 @@ type SchedulerServer struct {
KubeletNetworkPluginName string
StaticPodsConfigPath string
DockerCfgPath string
ContainPodResources bool
AccountForPodResources bool
executable string // path to the binary running this service
client *client.Client
@ -170,18 +173,20 @@ func NewSchedulerServer() *SchedulerServer {
MinionLogMaxBackups: minioncfg.DefaultLogMaxBackups,
MinionLogMaxAgeInDays: minioncfg.DefaultLogMaxAgeInDays,
MesosAuthProvider: sasl.ProviderName,
MesosCgroupPrefix: minioncfg.DefaultCgroupPrefix,
MesosMaster: defaultMesosMaster,
MesosUser: defaultMesosUser,
ReconcileInterval: defaultReconcileInterval,
ReconcileCooldown: defaultReconcileCooldown,
Checkpoint: true,
FrameworkName: defaultFrameworkName,
HA: false,
mux: http.NewServeMux(),
KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
KubeletSyncFrequency: 10 * time.Second,
MesosAuthProvider: sasl.ProviderName,
MesosCgroupPrefix: minioncfg.DefaultCgroupPrefix,
MesosMaster: defaultMesosMaster,
MesosUser: defaultMesosUser,
ReconcileInterval: defaultReconcileInterval,
ReconcileCooldown: defaultReconcileCooldown,
Checkpoint: true,
FrameworkName: defaultFrameworkName,
HA: false,
mux: http.NewServeMux(),
KubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
KubeletSyncFrequency: 10 * time.Second,
ContainPodResources: true,
AccountForPodResources: true,
}
// cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc.
@ -231,6 +236,8 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.IPVar(&s.ServiceAddress, "service-address", s.ServiceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)")
fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares")
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.ContainPodResources, "contain-pod-resources", s.ContainPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.BoolVar(&s.AccountForPodResources, "account-for-pod-resources", s.AccountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)")
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned minion and executor processes.")
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
@ -367,6 +374,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--mesos-cgroup-prefix=%v", s.MesosCgroupPrefix))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--sync-frequency=%v", s.KubeletSyncFrequency))
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--contain-pod-resources=%t", s.ContainPodResources))
if s.AuthPath != "" {
//TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file
@ -651,17 +659,27 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("misconfigured etcd: %v", err)
}
as := scheduler.NewAllocationStrategy(
podtask.DefaultPredicate,
podtask.NewDefaultProcurement(s.DefaultContainerCPULimit, s.DefaultContainerMemLimit))
// downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.AccountForPodResources {
as = scheduler.NewAllocationStrategy(
podtask.DefaultMinimalPredicate,
podtask.DefaultMinimalProcurement)
}
fcfs := scheduler.NewFCFSPodScheduler(as)
mesosPodScheduler := scheduler.New(scheduler.Config{
Schedcfg: *sc,
Executor: executor,
ScheduleFunc: scheduler.FCFSScheduleFunc,
Client: client,
EtcdClient: etcdClient,
FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown,
DefaultContainerCPULimit: s.DefaultContainerCPULimit,
DefaultContainerMemLimit: s.DefaultContainerMemLimit,
Schedcfg: *sc,
Executor: executor,
Scheduler: fcfs,
Client: client,
EtcdClient: etcdClient,
FailoverTimeout: s.FailoverTimeout,
ReconcileInterval: s.ReconcileInterval,
ReconcileCooldown: s.ReconcileCooldown,
})
masterUri := s.MesosMaster

View File

@ -23,17 +23,29 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
)
// PodScheduleFunc implements how to schedule pods among slaves.
// We can have different implementation for different scheduling policy.
//
// The Schedule function accepts a group of slaves (each contains offers from
// that slave) and a single pod, which aligns well with the k8s scheduling
// algorithm. It returns an offerId that is acceptable for the pod, otherwise
// nil. The caller is responsible for filling in task state w/ relevant offer
// details.
//
// See the FCFSScheduleFunc for example.
type PodScheduleFunc func(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error)
type AllocationStrategy interface {
// FitPredicate returns the selector used to determine pod fitness w/ respect to a given offer
FitPredicate() podtask.FitPredicate
// Procurement returns a func that obtains resources for a task from resource offer
Procurement() podtask.Procurement
}
type PodScheduler interface {
AllocationStrategy
// SchedulePod implements how to schedule pods among slaves.
// We can have different implementation for different scheduling policy.
//
// The function accepts a group of slaves (each contains offers from
// that slave) and a single pod, which aligns well with the k8s scheduling
// algorithm. It returns an offerId that is acceptable for the pod, otherwise
// nil. The caller is responsible for filling in task state w/ relevant offer
// details.
//
// See the FCFSPodScheduler for example.
SchedulePod(r offers.Registry, slaves SlaveIndex, task *podtask.T) (offers.Perishable, error)
}
// A minimal placeholder
type empty struct{}

View File

@ -1,5 +1,6 @@
accept-hosts
accept-paths
account-for-pod-resources
admission-control
admission-control-config-file
advertise-address
@ -43,6 +44,7 @@ cluster-name
cluster-tag
concurrent-endpoint-syncs
configure-cbr0
contain-pod-resources
container-port
container-runtime
cors-allowed-origins
@ -263,4 +265,4 @@ whitelist-override-label
www-prefix
retry_time
file_content_in_loop
cpu-cfs-quota
cpu-cfs-quota