From d194af6b4f1721cd4fe719ba8ea81b520386ccae Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Thu, 26 Mar 2015 12:31:54 +0000 Subject: [PATCH] allow for more easily customized kubelet creation and initialization --- cmd/integration/integration.go | 4 ++-- cmd/kubelet/app/server.go | 42 ++++++++++++++++++++++------------ cmd/kubernetes/kubernetes.go | 2 +- pkg/kubelet/kubelet.go | 8 +++++++ 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index e5cfea0851d..5214ab73bea 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -233,14 +233,14 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st configFilePath := makeTempDirOrDie("config", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir) kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil) - kubeletapp.RunKubelet(kcfg) + kubeletapp.RunKubelet(kcfg, nil) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil) - kubeletapp.RunKubelet(kcfg) + kubeletapp.RunKubelet(kcfg, nil) return apiServer.URL, configFilePath } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 3d136bcc32c..9bc4399632c 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -96,6 +96,19 @@ type KubeletServer struct { CertDirectory string } +// bootstrapping interface for kubelet, targets the initialization protocol +type KubeletBootstrap interface { + BirthCry() + StartGarbageCollection() + ListenAndServe(net.IP, uint, *kubelet.TLSOptions, bool) + ListenAndServeReadOnly(net.IP, uint) + Run(<-chan kubelet.PodUpdate) + RunOnce(<-chan kubelet.PodUpdate) ([]kubelet.RunPodResult, error) +} + +// create and initialize a Kubelet instance +type KubeletBuilder func(kc *KubeletConfig) (KubeletBootstrap, *config.PodConfig, error) + // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ @@ -267,7 +280,7 @@ func (s *KubeletServer) Run(_ []string) error { Cloud: cloud, } - RunKubelet(&kcfg) + RunKubelet(&kcfg, nil) if s.HealthzPort > 0 { healthz.DefaultHealthz() @@ -377,7 +390,7 @@ func SimpleKubelet(client *client.Client, // 2 Kubelet binary // 3 Standalone 'kubernetes' binary // Eventually, #2 will be replaced with instances of #3 -func RunKubelet(kcfg *KubeletConfig) { +func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) { kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride) eventBroadcaster := record.NewBroadcaster() kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) @@ -392,8 +405,10 @@ func RunKubelet(kcfg *KubeletConfig) { credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) - podCfg := makePodSourceConfig(kcfg) - k, err := createAndInitKubelet(kcfg, podCfg) + if builder == nil { + builder = createAndInitKubelet + } + k, podCfg, err := builder(kcfg) if err != nil { glog.Errorf("Failed to create kubelet: %s", err) return @@ -408,19 +423,19 @@ func RunKubelet(kcfg *KubeletConfig) { } } -func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfig) { +func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { // start the kubelet go util.Forever(func() { k.Run(podCfg.Updates()) }, 0) // start the kubelet server if kc.EnableServer { go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers) + k.ListenAndServe(net.IP(kc.Address), kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers) }, 0) } if kc.ReadOnlyPort > 0 { go util.Forever(func() { - kubelet.ListenAndServeKubeletReadOnlyServer(k, net.IP(kc.Address), kc.ReadOnlyPort) + k.ListenAndServeReadOnly(net.IP(kc.Address), kc.ReadOnlyPort) }, 0) } } @@ -488,16 +503,14 @@ type KubeletConfig struct { Cloud cloudprovider.Interface } -func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) { +func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations // TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing // a nil pointer to it when what we really want is a nil interface. var kubeClient client.Interface - if kc.KubeClient == nil { - kubeClient = nil - } else { + if kc.KubeClient != nil { kubeClient = kc.KubeClient } @@ -507,7 +520,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub MaxContainers: kc.MaxContainerCount, } - k, err := kubelet.NewMainKubelet( + pc = makePodSourceConfig(kc) + k, err = kubelet.NewMainKubelet( kc.Hostname, kc.DockerClient, kubeClient, @@ -531,12 +545,12 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.Cloud) if err != nil { - return nil, err + return nil, nil, err } k.BirthCry() k.StartGarbageCollection() - return k, nil + return k, pc, nil } diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 6c0b169aba7..d61be79979a 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -154,7 +154,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP glog.Fatalf("Failed to create cAdvisor: %v", err) } kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil) - kubeletapp.RunKubelet(kcfg) + kubeletapp.RunKubelet(kcfg, nil) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 244c798cd56..0cf189bab1f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2094,3 +2094,11 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) { } return kl.machineInfo, nil } + +func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *TLSOptions, enableDebuggingHandlers bool) { + ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers) +} + +func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { + ListenAndServeKubeletReadOnlyServer(kl, address, port) +}