From 9176e93ba833c727bfae5b98cbf403a27270c348 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Wed, 9 Dec 2015 15:39:55 +0100 Subject: [PATCH] service: initialize EventClient Kubelet events are not being tracked. This commit fixes it by initializing the event client. fixes https://github.com/mesosphere/kubernetes-mesos/issues/680 --- contrib/mesos/pkg/executor/service/service.go | 169 ++++++++++-------- 1 file changed, 95 insertions(+), 74 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 030644b8255..be20d7fc2a7 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -147,83 +147,104 @@ func (s *KubeletExecutorServer) runKubelet( staticPodsConfigPath string, apiclient *client.Client, podLW *cache.ListWatch, -) error { +) (err error) { + defer func() { + if err != nil { + // close the channel here. When Run returns without error, the executorKubelet is + // responsible to do this. If it returns with an error, we are responsible here. + close(kubeletDone) + } + }() + kcfg, err := s.UnsecuredKubeletConfig() - if err == nil { - // apply Mesos specific settings - executorDone := make(chan struct{}) - kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { - k, pc, err := kubeletapp.CreateAndInitKubelet(kc) - if err != nil { - return k, pc, err - } - - s.klet = k.(*kubelet.Kubelet) - close(s.kletReady) // intentionally crash if this is called more than once - - // decorate kubelet such that it shuts down when the executor is - decorated := &executorKubelet{ - Kubelet: s.klet, - kubeletDone: kubeletDone, - executorDone: executorDone, - } - - return decorated, pc, nil - } - kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup - kcfg.Hostname = kcfg.HostnameOverride - kcfg.KubeClient = apiclient - kcfg.NodeName = kcfg.HostnameOverride - kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source - kcfg.StandaloneMode = false - kcfg.SystemContainer = "" // don't take control over other system processes. - if kcfg.Cloud != nil { - // fail early and hard because having the cloud provider loaded would go unnoticed, - // but break bigger cluster because accessing the state.json from every slave kills the master. - panic("cloud provider must not be set") - } - - // create custom cAdvisor interface which return the resource values that Mesos reports - ni := <-nodeInfos - cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort) - if err != nil { - return err - } - kcfg.CAdvisorInterface = cAdvisorInterface - kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface) - if err != nil { - return err - } - - go func() { - for ni := range nodeInfos { - // TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished - log.V(3).Infof("ignoring updated node resources: %v", ni) - } - }() - - // create main pod source, it will close executorDone when the executor updates stop flowing - newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW) - - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) - - // run the kubelet, until execUpdates is closed - // NOTE: because kcfg != nil holds, the upstream Run function will not - // initialize the cloud provider. We explicitly wouldn't want - // that because then every kubelet instance would query the master - // state.json which does not scale. - err = s.KubeletServer.Run(kcfg) - } - if err != nil { - // close the channel here. When Run returns without error, the executorKubelet is - // responsible to do this. If it returns with an error, we are responsible here. - close(kubeletDone) + return err } - return err + + // apply Mesos specific settings + executorDone := make(chan struct{}) + kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) { + k, pc, err := kubeletapp.CreateAndInitKubelet(kc) + if err != nil { + return k, pc, err + } + + s.klet = k.(*kubelet.Kubelet) + close(s.kletReady) // intentionally crash if this is called more than once + + // decorate kubelet such that it shuts down when the executor is + decorated := &executorKubelet{ + Kubelet: s.klet, + kubeletDone: kubeletDone, + executorDone: executorDone, + } + + return decorated, pc, nil + } + kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup + kcfg.Hostname = kcfg.HostnameOverride + kcfg.KubeClient = apiclient + + // taken from KubeletServer#Run(*KubeletConfig) + eventClientConfig, err := s.CreateAPIServerClientConfig() + if err != nil { + return err + } + + // make a separate client for events + eventClientConfig.QPS = s.EventRecordQPS + eventClientConfig.Burst = s.EventBurst + kcfg.EventClient, err = client.New(eventClientConfig) + if err != nil { + return err + } + + kcfg.NodeName = kcfg.HostnameOverride + kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source + kcfg.StandaloneMode = false + kcfg.SystemContainer = "" // don't take control over other system processes. + if kcfg.Cloud != nil { + // fail early and hard because having the cloud provider loaded would go unnoticed, + // but break bigger cluster because accessing the state.json from every slave kills the master. + panic("cloud provider must not be set") + } + + // create custom cAdvisor interface which return the resource values that Mesos reports + ni := <-nodeInfos + cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort) + if err != nil { + return err + } + + kcfg.CAdvisorInterface = cAdvisorInterface + kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface) + if err != nil { + return err + } + + go func() { + for ni := range nodeInfos { + // TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished + log.V(3).Infof("ignoring updated node resources: %v", ni) + } + }() + + // create main pod source, it will close executorDone when the executor updates stop flowing + newSourceMesos(executorDone, execUpdates, kcfg.PodConfig.Channel(mesosSource), podLW) + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) + + // run the kubelet, until execUpdates is closed + // NOTE: because kcfg != nil holds, the upstream Run function will not + // initialize the cloud provider. We explicitly wouldn't want + // that because then every kubelet instance would query the master + // state.json which does not scale. + err = s.KubeletServer.Run(kcfg) + + return } // Run runs the specified KubeletExecutorServer.