From 0feb1bceb5ea8ddf2727235f73b04f355ab221f3 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 23 Sep 2015 09:38:00 +0200 Subject: [PATCH] Re-use KubeletServer.KubeletConfig and KubeletServer.Run --- contrib/mesos/pkg/executor/service/service.go | 261 +++--------------- 1 file changed, 39 insertions(+), 222 deletions(-) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 8528c187b7f..d9819d55257 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -19,11 +19,8 @@ package service import ( "fmt" "net" - "net/http" "os" "path/filepath" - "strconv" - "strings" "sync" "time" @@ -37,20 +34,13 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/kubelet" - "k8s.io/kubernetes/pkg/kubelet/cadvisor" kconfig "k8s.io/kubernetes/pkg/kubelet/config" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" - utilio "k8s.io/kubernetes/pkg/util/io" - "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/util/oom" - "k8s.io/kubernetes/pkg/util/rand" ) const ( @@ -90,18 +80,8 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.") } -func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error { - // create apiserver client - var apiclient *client.Client - clientConfig, err := s.CreateAPIServerClientConfig() - if err == nil { - apiclient, err = client.New(clientConfig) - } - if err != nil { - // required for k8sm since we need to send api.Binding information - // back to the apiserver - log.Fatalf("No API client: %v", err) - } +func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { exec := executor.New(executor.Config{ Updates: execUpdates, APIClient: apiclient, @@ -161,19 +141,40 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda return nil } -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error { - // empty string for the docker and system containers (= cgroup paths). This - // stops the kubelet taking any control over other system processes. - s.SystemContainer = "" - s.DockerDaemonContainer = "" - - oomAdjuster := oom.NewOOMAdjuster() - if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil { - log.Info(err) +func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, + staticPodsConfigPath string, apiclient *client.Client) error { + kcfg, err := s.UnsecuredKubeletConfig() + if err == nil { + kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { + return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished) + } + kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup + kcfg.Hostname = kcfg.HostnameOverride + kcfg.KubeClient = apiclient + kcfg.NodeName = kcfg.HostnameOverride + kcfg.StandaloneMode = false + kcfg.SystemContainer = "" // don't take control over other system processes. + err = s.KubeletServer.Run(kcfg) } - dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint) + if err != nil { + close(kubeletFinished) + } + return err +} + +// Run runs the specified KubeletExecutorServer. +func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { + // create shared channels + kubeletFinished := make(chan struct{}) + execUpdates := make(chan kubetypes.PodUpdate, 1) + + // create static pods directory + staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") + err := os.Mkdir(staticPodsConfigPath, 0755) + if err != nil { + return err + } // create apiserver client var apiclient *client.Client @@ -186,194 +187,14 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat log.Fatalf("No API client: %v", err) } - log.Infof("Using root directory: %v", s.RootDirectory) - credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) - - cAdvisorInterface, err := cadvisor.New(s.CAdvisorPort) - if err != nil { - return err - } - - imageGCPolicy := kubelet.ImageGCPolicy{ - HighThresholdPercent: s.ImageGCHighThresholdPercent, - LowThresholdPercent: s.ImageGCLowThresholdPercent, - } - - diskSpacePolicy := kubelet.DiskSpacePolicy{ - DockerFreeDiskMB: s.LowDiskSpaceThresholdMB, - RootFreeDiskMB: s.LowDiskSpaceThresholdMB, - } - - manifestURLHeader := make(http.Header) - if s.ManifestURLHeader != "" { - pieces := strings.Split(s.ManifestURLHeader, ":") - if len(pieces) != 2 { - return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) - } - manifestURLHeader.Set(pieces[0], pieces[1]) - } - - //TODO(jdef) intentionally NOT initializing a cloud provider here since: - //(a) the kubelet doesn't actually use it - //(b) we don't need to create N-kubelet connections to zookeeper for no good reason - //cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) - //log.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) - - hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ",")) - if err != nil { - return err - } - - hostPIDSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostPIDSources, ",")) - if err != nil { - return err - } - - hostIPCSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostIPCSources, ",")) - if err != nil { - return err - } - - tlsOptions, err := s.InitializeTLS() - if err != nil { - return err - } - mounter := mount.New() - if s.Containerized { - log.V(2).Info("Running kubelet in containerized mode (experimental)") - mounter = &mount.NsenterMounter{} - } - - var writer utilio.Writer = &utilio.StdWriter{} - var dockerExecHandler dockertools.ExecHandler - switch s.DockerExecHandlerName { - case "native": - dockerExecHandler = &dockertools.NativeExecHandler{} - case "nsenter": - writer = &utilio.NsenterWriter{} - dockerExecHandler = &dockertools.NsenterExecHandler{} - default: - log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) - dockerExecHandler = &dockertools.NativeExecHandler{} - } - - // prepare kubelet - kcfg := app.KubeletConfig{ - Address: s.Address, - AllowPrivileged: s.AllowPrivileged, - CAdvisorInterface: cAdvisorInterface, - CgroupRoot: s.CgroupRoot, - Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl - ClusterDNS: s.ClusterDNS, - ClusterDomain: s.ClusterDomain, - // ConfigFile: "" - ConfigureCBR0: s.ConfigureCBR0, - ContainerRuntime: s.ContainerRuntime, - CPUCFSQuota: s.CPUCFSQuota, - DiskSpacePolicy: diskSpacePolicy, - DockerClient: dockerClient, - DockerDaemonContainer: s.DockerDaemonContainer, - DockerExecHandler: dockerExecHandler, - EnableDebuggingHandlers: s.EnableDebuggingHandlers, - EnableServer: s.EnableServer, - EventBurst: s.EventBurst, - EventRecordQPS: s.EventRecordQPS, - FileCheckFrequency: s.FileCheckFrequency, - HostnameOverride: s.HostnameOverride, - HostNetworkSources: hostNetworkSources, - HostPIDSources: hostPIDSources, - HostIPCSources: hostIPCSources, - // HTTPCheckFrequency - ImageGCPolicy: imageGCPolicy, - KubeClient: apiclient, - // ManifestURL: "" - ManifestURLHeader: manifestURLHeader, - MasterServiceNamespace: s.MasterServiceNamespace, - MaxContainerCount: s.MaxContainerCount, - MaxOpenFiles: s.MaxOpenFiles, - MaxPerPodContainerCount: s.MaxPerPodContainerCount, - MaxPods: s.MaxPods, - MinimumGCAge: s.MinimumGCAge, - Mounter: mounter, - NetworkPluginName: s.NetworkPluginName, - NetworkPlugins: app.ProbeNetworkPlugins(s.NetworkPluginDir), - NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, - OOMAdjuster: oomAdjuster, - OSInterface: kubecontainer.RealOS{}, - PodCIDR: s.PodCIDR, - PodInfraContainerImage: s.PodInfraContainerImage, - Port: s.Port, - ReadOnlyPort: s.ReadOnlyPort, - RegisterNode: s.RegisterNode, - RegistryBurst: s.RegistryBurst, - RegistryPullQPS: s.RegistryPullQPS, - ResolverConfig: s.ResolverConfig, - ResourceContainer: s.ResourceContainer, - RootDirectory: s.RootDirectory, - Runonce: s.RunOnce, - // StandaloneMode: false - StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, - SyncFrequency: s.SyncFrequency, - SystemContainer: s.SystemContainer, - TLSOptions: tlsOptions, - VolumePlugins: app.ProbeVolumePlugins(), - Writer: writer, - } - - kcfg.NodeName = kcfg.Hostname - - kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished) - }) - err = app.RunKubelet(&kcfg) - if err != nil { - return err - } - - // start health check server - if s.HealthzPort > 0 { - healthz.DefaultHealthz() - go util.Until(func() { - err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil) - if err != nil { - log.Errorf("Starting health server failed: %v", err) - } - }, 5*time.Second, util.NeverStop) - } - - return nil -} - -// Run runs the specified KubeletExecutorServer. -func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { - rand.Seed(time.Now().UTC().UnixNano()) - - // create shared channels - kubeletFinished := make(chan struct{}) - execUpdates := make(chan kubetypes.PodUpdate, 1) - - // create static pods directory - staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods") - err := os.Mkdir(staticPodsConfigPath, 0755) - if err != nil { - return err - } - // start executor - err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath) + err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) if err != nil { return err } - // start kubelet - err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath) - if err != nil { - close(kubeletFinished) // tell executor - return err - } - - // block until executor is shut down or commits shutdown - select {} + // start kubelet, blocking + return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient) } func defaultBindingAddress() string { @@ -387,12 +208,10 @@ func defaultBindingAddress() string { func (ks *KubeletExecutorServer) createAndInitKubelet( kc *app.KubeletConfig, - clientConfig *client.Config, staticPodsConfigPath string, execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{}, ) (app.KubeletBootstrap, *kconfig.PodConfig, error) { - // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations // TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods @@ -405,7 +224,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kubeClient = kc.KubeClient } - gcPolicy := kubecontainer.ContainerGCPolicy{ + gcPolicy := kubeletContainer.ContainerGCPolicy{ MinAge: kc.MinimumGCAge, MaxPerPodContainer: kc.MaxPerPodContainerCount, MaxContainers: kc.MaxContainerCount, @@ -495,7 +314,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( address: ks.Address, dockerClient: kc.DockerClient, kubeletDone: kubeletDone, - clientConfig: clientConfig, executorDone: executorDone, } @@ -512,7 +330,6 @@ type kubeletExecutor struct { dockerClient dockertools.DockerInterface kubeletDone chan<- struct{} // closed once kubelet.Run() returns executorDone <-chan struct{} // closed when executor terminates - clientConfig *client.Config } func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) {