diff --git a/contrib/mesos/pkg/executor/executor.go b/contrib/mesos/pkg/executor/executor.go index 14ba8e3433e..6e02d715718 100644 --- a/contrib/mesos/pkg/executor/executor.go +++ b/contrib/mesos/pkg/executor/executor.go @@ -92,6 +92,11 @@ type kuberTask struct { type podStatusFunc func() (*api.PodStatus, error) +type NodeInfo struct { + Cores int + Mem int64 // in bytes +} + // KubernetesExecutor is an mesos executor that runs pods // in a minion machine. type KubernetesExecutor struct { @@ -113,6 +118,7 @@ type KubernetesExecutor struct { staticPodsConfigPath string podController *framework.Controller launchGracePeriod time.Duration + nodeInfos chan<- NodeInfo } type Config struct { @@ -127,6 +133,7 @@ type Config struct { StaticPodsConfigPath string PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic LaunchGracePeriod time.Duration + NodeInfos chan<- NodeInfo } func (k *KubernetesExecutor) isConnected() bool { @@ -152,6 +159,7 @@ func New(config Config) *KubernetesExecutor { podStatusFunc: config.PodStatusFunc, staticPodsConfigPath: config.StaticPodsConfigPath, launchGracePeriod: config.LaunchGracePeriod, + nodeInfos: config.NodeInfos, } // watch pods from the given pod ListWatch @@ -236,6 +244,10 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver, Pods: []*api.Pod{}, Op: kubetypes.SET, }) + + if slaveInfo != nil && k.nodeInfos != nil { + k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics + } } // Reregistered is called when the executor is successfully re-registered with the slave. @@ -255,6 +267,16 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI log.Errorf("cannot update node labels: %v", err) } } + + if slaveInfo != nil && k.nodeInfos != nil { + // make sure nodeInfos is not nil and send new NodeInfo + k.lock.Lock() + defer k.lock.Unlock() + if k.isDone() { + return + } + k.nodeInfos <- nodeInfo(slaveInfo, nil) + } } // initializeStaticPodsSource unzips the data slice into the static-pods directory @@ -796,6 +818,7 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) { // signal to all listeners that this KubeletExecutor is done! close(k.terminate) close(k.updateChan) + close(k.nodeInfos) if k.shutdownAlert != nil { func() { @@ -926,3 +949,40 @@ func differentTime(a, b *unversionedapi.Time) bool { func differentPeriod(a, b *int64) bool { return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b) } + +func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo { + var executorCPU, executorMem float64 + + // get executor resources + if ei != nil { + for _, r := range ei.GetResources() { + if r == nil || r.GetType() != mesos.Value_SCALAR { + continue + } + switch r.GetName() { + case "cpus": + executorCPU = r.GetScalar().GetValue() + case "mem": + executorMem = r.GetScalar().GetValue() + } + } + } + + // get resource capacity of the node + ni := NodeInfo{} + for _, r := range si.GetResources() { + if r == nil || r.GetType() != mesos.Value_SCALAR { + continue + } + + switch r.GetName() { + case "cpus": + // We intentionally take the floor of executorCPU because cores are integers + // and we would loose a complete cpu here if the value is <1. + ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU))) + case "mem": + ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024 + } + } + return ni +} diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 029e64076eb..54547ff2053 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -135,8 +135,9 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { mockDriver := &MockExecutorDriver{} updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: updates, + NodeInfos: make(chan NodeInfo, 1), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -297,8 +298,9 @@ func TestExecutorStaticPods(t *testing.T) { mockDriver := &MockExecutorDriver{} config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init + NodeInfos: make(chan NodeInfo, 1), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -379,8 +381,9 @@ func TestExecutorFrameworkMessage(t *testing.T) { mockDriver := &MockExecutorDriver{} kubeletFinished := make(chan struct{}) config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: make(chan kubetypes.PodUpdate, 1024), + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: make(chan kubetypes.PodUpdate, 1024), + NodeInfos: make(chan NodeInfo, 1), APIClient: client.NewOrDie(&client.Config{ Host: testApiServer.server.URL, Version: testapi.Default.Version(), @@ -558,8 +561,9 @@ func TestExecutorShutdown(t *testing.T) { var exitCalled int32 = 0 updates := make(chan kubetypes.PodUpdate, 1024) config := Config{ - Docker: dockertools.ConnectToDockerOrDie("fake://"), - Updates: updates, + Docker: dockertools.ConnectToDockerOrDie("fake://"), + Updates: updates, + NodeInfos: make(chan NodeInfo, 1), ShutdownAlert: func() { close(kubeletFinished) }, diff --git a/contrib/mesos/pkg/executor/service/cadvisor.go b/contrib/mesos/pkg/executor/service/cadvisor.go new file mode 100644 index 00000000000..d8d3605b83d --- /dev/null +++ b/contrib/mesos/pkg/executor/service/cadvisor.go @@ -0,0 +1,51 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "k8s.io/kubernetes/pkg/kubelet/cadvisor" + + cadvisorApi "github.com/google/cadvisor/info/v1" +) + +type MesosCadvisor struct { + cadvisor.Interface + cores int + mem int64 +} + +func NewMesosCadvisor(cores int, mem int64, port uint) (*MesosCadvisor, error) { + c, err := cadvisor.New(port) + if err != nil { + return nil, err + } + return &MesosCadvisor{c, cores, mem}, nil +} + +func (mc *MesosCadvisor) MachineInfo() (*cadvisorApi.MachineInfo, error) { + mi, err := mc.Interface.MachineInfo() + if err != nil { + return nil, err + } + + // set Mesos provided values + mesosMi := *mi + mesosMi.NumCores = mc.cores + mesosMi.MemoryCapacity = mc.mem + + return &mesosMi, nil +} diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index a6454938927..5bdbc485eb4 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -78,7 +78,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{}, staticPodsConfigPath string, apiclient *client.Client) error { exec := executor.New(executor.Config{ Updates: execUpdates, @@ -113,6 +113,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride), ), + NodeInfos: nodeInfos, }) // initialize driver and initialize the executor with it @@ -139,7 +140,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{}, staticPodsConfigPath string, apiclient *client.Client) error { kcfg, err := s.UnsecuredKubeletConfig() if err == nil { @@ -179,6 +180,20 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat panic("cloud provider must not be set") } + // create custom cAdvisor interface which return the resource values that Mesos reports + ni := <-nodeInfos + cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort) + if err != nil { + return err + } + kcfg.CAdvisorInterface = cAdvisorInterface + go func() { + for ni := range nodeInfos { + // TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished + log.V(3).Infof("ignoring updated node resources: %v", ni) + } + }() + // create main pod source updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE) go func() { @@ -217,6 +232,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { // create shared channels kubeletFinished := make(chan struct{}) execUpdates := make(chan kubetypes.PodUpdate, 1) + nodeInfos := make(chan executor.NodeInfo, 1) // create static pods directory staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") @@ -237,13 +253,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { } // start executor - err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) + err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient) if err != nil { return err } // start kubelet, blocking - return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) + return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient) } func defaultBindingAddress() string {