From d74950cfb9358b76307ce3904fb7d848295885ec Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 13:07:03 +0200 Subject: [PATCH] Re-use CreateAndInitKubelet --- contrib/mesos/pkg/executor/service/service.go | 240 ++++-------------- 1 file changed, 55 insertions(+), 185 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 4841f0db452..9290fa8b5d3 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -37,10 +37,8 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet" kconfig "k8s.io/kubernetes/pkg/kubelet/config" - kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" ) const ( @@ -141,24 +139,75 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, staticPodsConfigPath string, apiclient *client.Client) error { kcfg, err := s.UnsecuredKubeletConfig() if err == nil { + // apply Messo specific settings + executorDone := make(chan struct{}) kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished) + k, pc, err := app.CreateAndInitKubelet(kc) + if err != nil { + return k, pc, err + } + + klet := k.(*kubelet.Kubelet) + + s.kletLock.Lock() + s.klet = klet + s.kletLock.Unlock() + + // decorate kubelet such that it shuts down when the executor is + decorated := &executorKubelet{ + Kubelet: klet, + kubeletDone: kubeletDone, + executorDone: executorDone, + } + + return decorated, pc, nil } kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup kcfg.Hostname = kcfg.HostnameOverride kcfg.KubeClient = apiclient kcfg.NodeName = kcfg.HostnameOverride + kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source kcfg.StandaloneMode = false kcfg.SystemContainer = "" // don't take control over other system processes. + if kcfg.Cloud != nil { + // fail early and hard because having the cloud provider loaded would go unnoticed, + // but break bigger cluster because accessing the state.json from every slave kills the master. + panic("cloud provider must not be set") + } + + // create main pod source + updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE) + go func() { + // execUpdates will be closed by the executor on shutdown + defer close(executorDone) + + for u := range execUpdates { + u.Source = MESOS_CFG_SOURCE + updates <- u + } + }() + + // create static-pods directory file source + log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) + fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource) + kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates) + + // run the kubelet, until execUpdates is closed + // NOTE: because kcfg != nil holds, the upstream Run function will not + // initialize the cloud provider. We explicitly wouldn't want + // that because then every kubelet instance would query the master + // state.json which does not scale. err = s.KubeletServer.Run(kcfg) } if err != nil { - close(kubeletFinished) + // close the channel here. When Run returns without error, the executorKubelet is + // responsible to do this. If it returns with an error, we are responsible here. + close(kubeletDone) } return err } @@ -184,7 +233,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { } if err != nil { // required for k8sm since we need to send api.Binding information back to the apiserver - log.Fatalf("No API client: %v", err) + return fmt.Errorf("cannot create API client: %v", err) } // start executor @@ -205,182 +254,3 @@ func defaultBindingAddress() string { return libProcessIP } } - -func (ks *KubeletExecutorServer) createAndInitKubelet( - kc *app.KubeletConfig, - staticPodsConfigPath string, - execUpdates <-chan kubetypes.PodUpdate, - kubeletDone chan<- struct{}, -) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop - // up into "per source" synchronizations - // TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods - // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing - // a nil pointer to it when what we really want is a nil interface. - var kubeClient client.Interface - if kc.KubeClient == nil { - kubeClient = nil - } else { - kubeClient = kc.KubeClient - } - - gcPolicy := kubeletContainer.ContainerGCPolicy{ - MinAge: kc.MinimumGCAge, - MaxPerPodContainer: kc.MaxPerPodContainerCount, - MaxContainers: kc.MaxContainerCount, - } - - // create main pod source - pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) - updates := pc.Channel(MESOS_CFG_SOURCE) - executorDone := make(chan struct{}) - go func() { - // execUpdates will be closed by the executor on shutdown - defer close(executorDone) - - for u := range execUpdates { - u.Source = MESOS_CFG_SOURCE - updates <- u - } - }() - - // create static-pods directory file source - log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath) - fileSourceUpdates := pc.Channel(kubetypes.FileSource) - kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates) - - klet, err := kubelet.NewMainKubelet( - kc.Hostname, - kc.NodeName, - kc.DockerClient, - kubeClient, - kc.RootDirectory, - kc.PodInfraContainerImage, - kc.SyncFrequency, - float32(kc.RegistryPullQPS), - kc.RegistryBurst, - kc.EventRecordQPS, - kc.EventBurst, - gcPolicy, - pc.SeenAllSources, - kc.RegisterNode, - kc.RegisterSchedulable, - kc.StandaloneMode, - kc.ClusterDomain, - net.IP(kc.ClusterDNS), - kc.MasterServiceNamespace, - kc.VolumePlugins, - kc.NetworkPlugins, - kc.NetworkPluginName, - kc.StreamingConnectionIdleTimeout, - kc.Recorder, - kc.CAdvisorInterface, - kc.ImageGCPolicy, - kc.DiskSpacePolicy, - kc.Cloud, - kc.NodeStatusUpdateFrequency, - kc.ResourceContainer, - kc.OSInterface, - kc.CgroupRoot, - kc.ContainerRuntime, - kc.RktPath, - kc.RktStage1Image, - kc.Mounter, - kc.Writer, - kc.DockerDaemonContainer, - kc.SystemContainer, - kc.ConfigureCBR0, - kc.PodCIDR, - kc.ReconcileCIDR, - kc.MaxPods, - kc.DockerExecHandler, - kc.ResolverConfig, - kc.CPUCFSQuota, - &api.NodeDaemonEndpoints{ - KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)}, - }, - kc.OOMAdjuster, - ) - if err != nil { - return nil, nil, err - } - - ks.kletLock.Lock() - ks.klet = klet - ks.kletLock.Unlock() - - // decorate kubelet such that it shuts down when the executor is - k := &executorKubelet{ - Kubelet: ks.klet, - kubeletDone: kubeletDone, - executorDone: executorDone, - } - - k.BirthCry() - k.StartGarbageCollection() - - return k, pc, nil -} - -// kubelet decorator -type kubeletExecutor struct { - *kubelet.Kubelet - address net.IP - dockerClient dockertools.DockerInterface - kubeletDone chan<- struct{} // closed once kubelet.Run() returns - executorDone <-chan struct{} // closed when executor terminates -} - -func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) { - log.Infof("Starting kubelet server...") - kubelet.ListenAndServeKubeletServer(kl, address, port, tlsOptions, auth, enableDebuggingHandlers) -} - -// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. -// never returns. -func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) { - defer func() { - close(kl.kubeletDone) - util.HandleCrash() - log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity - // important: never return! this is in our contract - select {} - }() - - // push merged updates into another, closable update channel which is closed - // when the executor shuts down. - closableUpdates := make(chan kubetypes.PodUpdate) - go func() { - // closing closableUpdates will cause our patched kubelet's syncLoop() to exit - defer close(closableUpdates) - pipeLoop: - for { - select { - case <-kl.executorDone: - break pipeLoop - default: - select { - case u := <-mergedUpdates: - select { - case closableUpdates <- u: // noop - case <-kl.executorDone: - break pipeLoop - } - case <-kl.executorDone: - break pipeLoop - } - } - } - }() - - // we expect that Run() will complete after closableUpdates is closed and the - // kubelet's syncLoop() has finished processing its backlog, which hopefully - // will not take very long. Peeking into the future (current k8s master) it - // seems that the backlog has grown from 1 to 50 -- this may negatively impact - // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) - - //TODO(jdef) revisit this if/when executor failover lands - // Force kubelet to delete all pods. - kl.HandlePodDeletions(kl.GetPods()) -}