Pass Mesos cpu and mem values to cadvisor

This commit is contained in:
Dr. Stefan Schimanski 2015-09-11 15:45:26 +02:00
parent 31ab4f1222
commit ae7830b4be
4 changed files with 143 additions and 12 deletions

View File

@ -92,6 +92,11 @@ type kuberTask struct {
type podStatusFunc func() (*api.PodStatus, error)
type NodeInfo struct {
Cores int
Mem int64 // in bytes
}
// KubernetesExecutor is an mesos executor that runs pods
// in a minion machine.
type KubernetesExecutor struct {
@ -113,6 +118,7 @@ type KubernetesExecutor struct {
staticPodsConfigPath string
podController *framework.Controller
launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo
}
type Config struct {
@ -127,6 +133,7 @@ type Config struct {
StaticPodsConfigPath string
PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo
}
func (k *KubernetesExecutor) isConnected() bool {
@ -152,6 +159,7 @@ func New(config Config) *KubernetesExecutor {
podStatusFunc: config.PodStatusFunc,
staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: config.LaunchGracePeriod,
nodeInfos: config.NodeInfos,
}
// watch pods from the given pod ListWatch
@ -236,6 +244,10 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
Pods: []*api.Pod{},
Op: kubetypes.SET,
})
if slaveInfo != nil && k.nodeInfos != nil {
k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics
}
}
// Reregistered is called when the executor is successfully re-registered with the slave.
@ -255,6 +267,16 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
log.Errorf("cannot update node labels: %v", err)
}
}
if slaveInfo != nil && k.nodeInfos != nil {
// make sure nodeInfos is not nil and send new NodeInfo
k.lock.Lock()
defer k.lock.Unlock()
if k.isDone() {
return
}
k.nodeInfos <- nodeInfo(slaveInfo, nil)
}
}
// initializeStaticPodsSource unzips the data slice into the static-pods directory
@ -796,6 +818,7 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) {
// signal to all listeners that this KubeletExecutor is done!
close(k.terminate)
close(k.updateChan)
close(k.nodeInfos)
if k.shutdownAlert != nil {
func() {
@ -926,3 +949,40 @@ func differentTime(a, b *unversionedapi.Time) bool {
func differentPeriod(a, b *int64) bool {
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
}
func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
var executorCPU, executorMem float64
// get executor resources
if ei != nil {
for _, r := range ei.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
executorCPU = r.GetScalar().GetValue()
case "mem":
executorMem = r.GetScalar().GetValue()
}
}
}
// get resource capacity of the node
ni := NodeInfo{}
for _, r := range si.GetResources() {
if r == nil || r.GetType() != mesos.Value_SCALAR {
continue
}
switch r.GetName() {
case "cpus":
// We intentionally take the floor of executorCPU because cores are integers
// and we would loose a complete cpu here if the value is <1.
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU)))
case "mem":
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024
}
}
return ni
}

View File

@ -135,8 +135,9 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mockDriver := &MockExecutorDriver{}
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
NodeInfos: make(chan NodeInfo, 1),
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
@ -297,8 +298,9 @@ func TestExecutorStaticPods(t *testing.T) {
mockDriver := &MockExecutorDriver{}
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
NodeInfos: make(chan NodeInfo, 1),
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
@ -379,8 +381,9 @@ func TestExecutorFrameworkMessage(t *testing.T) {
mockDriver := &MockExecutorDriver{}
kubeletFinished := make(chan struct{})
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan kubetypes.PodUpdate, 1024),
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan kubetypes.PodUpdate, 1024),
NodeInfos: make(chan NodeInfo, 1),
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Default.Version(),
@ -558,8 +561,9 @@ func TestExecutorShutdown(t *testing.T) {
var exitCalled int32 = 0
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates,
NodeInfos: make(chan NodeInfo, 1),
ShutdownAlert: func() {
close(kubeletFinished)
},

View File

@ -0,0 +1,51 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
type MesosCadvisor struct {
cadvisor.Interface
cores int
mem int64
}
func NewMesosCadvisor(cores int, mem int64, port uint) (*MesosCadvisor, error) {
c, err := cadvisor.New(port)
if err != nil {
return nil, err
}
return &MesosCadvisor{c, cores, mem}, nil
}
func (mc *MesosCadvisor) MachineInfo() (*cadvisorApi.MachineInfo, error) {
mi, err := mc.Interface.MachineInfo()
if err != nil {
return nil, err
}
// set Mesos provided values
mesosMi := *mi
mesosMi.NumCores = mc.cores
mesosMi.MemoryCapacity = mc.mem
return &mesosMi, nil
}

View File

@ -78,7 +78,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
}
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{},
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
exec := executor.New(executor.Config{
Updates: execUpdates,
@ -113,6 +113,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
),
NodeInfos: nodeInfos,
})
// initialize driver and initialize the executor with it
@ -139,7 +140,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
return nil
}
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{},
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
@ -179,6 +180,20 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
panic("cloud provider must not be set")
}
// create custom cAdvisor interface which return the resource values that Mesos reports
ni := <-nodeInfos
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
if err != nil {
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
log.V(3).Infof("ignoring updated node resources: %v", ni)
}
}()
// create main pod source
updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE)
go func() {
@ -217,6 +232,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
// create shared channels
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
nodeInfos := make(chan executor.NodeInfo, 1)
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
@ -237,13 +253,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
}
// start executor
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
if err != nil {
return err
}
// start kubelet, blocking
return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
}
func defaultBindingAddress() string {