allow for more easily customized kubelet creation and initialization

This commit is contained in:
James DeFelice 2015-03-26 12:31:54 +00:00
parent 9e61d743ec
commit d194af6b4f
4 changed files with 39 additions and 17 deletions

View File

@ -233,14 +233,14 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
configFilePath := makeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil)
kubeletapp.RunKubelet(kcfg)
kubeletapp.RunKubelet(kcfg, nil)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil)
kubeletapp.RunKubelet(kcfg)
kubeletapp.RunKubelet(kcfg, nil)
return apiServer.URL, configFilePath
}

View File

@ -96,6 +96,19 @@ type KubeletServer struct {
CertDirectory string
}
// bootstrapping interface for kubelet, targets the initialization protocol
type KubeletBootstrap interface {
BirthCry()
StartGarbageCollection()
ListenAndServe(net.IP, uint, *kubelet.TLSOptions, bool)
ListenAndServeReadOnly(net.IP, uint)
Run(<-chan kubelet.PodUpdate)
RunOnce(<-chan kubelet.PodUpdate) ([]kubelet.RunPodResult, error)
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kc *KubeletConfig) (KubeletBootstrap, *config.PodConfig, error)
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
return &KubeletServer{
@ -267,7 +280,7 @@ func (s *KubeletServer) Run(_ []string) error {
Cloud: cloud,
}
RunKubelet(&kcfg)
RunKubelet(&kcfg, nil)
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
@ -377,7 +390,7 @@ func SimpleKubelet(client *client.Client,
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) {
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname})
@ -392,8 +405,10 @@ func RunKubelet(kcfg *KubeletConfig) {
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
podCfg := makePodSourceConfig(kcfg)
k, err := createAndInitKubelet(kcfg, podCfg)
if builder == nil {
builder = createAndInitKubelet
}
k, podCfg, err := builder(kcfg)
if err != nil {
glog.Errorf("Failed to create kubelet: %s", err)
return
@ -408,19 +423,19 @@ func RunKubelet(kcfg *KubeletConfig) {
}
}
func startKubelet(k *kubelet.Kubelet, podCfg *config.PodConfig, kc *KubeletConfig) {
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Forever(func() { k.Run(podCfg.Updates()) }, 0)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, net.IP(kc.Address), kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)
k.ListenAndServe(net.IP(kc.Address), kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)
}, 0)
}
if kc.ReadOnlyPort > 0 {
go util.Forever(func() {
kubelet.ListenAndServeKubeletReadOnlyServer(k, net.IP(kc.Address), kc.ReadOnlyPort)
k.ListenAndServeReadOnly(net.IP(kc.Address), kc.ReadOnlyPort)
}, 0)
}
}
@ -488,16 +503,14 @@ type KubeletConfig struct {
Cloud cloudprovider.Interface
}
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient client.Interface
if kc.KubeClient == nil {
kubeClient = nil
} else {
if kc.KubeClient != nil {
kubeClient = kc.KubeClient
}
@ -507,7 +520,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
MaxContainers: kc.MaxContainerCount,
}
k, err := kubelet.NewMainKubelet(
pc = makePodSourceConfig(kc)
k, err = kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
kubeClient,
@ -531,12 +545,12 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.Cloud)
if err != nil {
return nil, err
return nil, nil, err
}
k.BirthCry()
k.StartGarbageCollection()
return k, nil
return k, pc, nil
}

View File

@ -154,7 +154,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
glog.Fatalf("Failed to create cAdvisor: %v", err)
}
kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil)
kubeletapp.RunKubelet(kcfg)
kubeletapp.RunKubelet(kcfg, nil)
}

View File

@ -2094,3 +2094,11 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
}
return kl.machineInfo, nil
}
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *TLSOptions, enableDebuggingHandlers bool) {
ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers)
}
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
ListenAndServeKubeletReadOnlyServer(kl, address, port)
}