diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c4acaaa6d28..fc673027bc6 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -23,7 +23,6 @@ import ( "net" "net/http" "net/http/httptest" - "os" "reflect" "runtime" "strconv" @@ -40,12 +39,11 @@ import ( minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" @@ -56,6 +54,7 @@ import ( ) const testRootDir = "/tmp/kubelet" +const testRootDir2 = "/tmp/kubelet2" var ( fakeDocker1, fakeDocker2 dockertools.FakeDockerClient @@ -187,26 +186,11 @@ func startComponents(manifestURL string) (apiServerURL string) { minionController.Run(10 * time.Second) // Kubelet (localhost) - os.MkdirAll(testRootDir, 0750) - cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) - config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd")) - config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) - myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], testRootDir, &fakeDocker1) - go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) - go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250, true) - }, 0) - + standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. - cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) - config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd")) - otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], testRootDir, &fakeDocker2) - go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) - go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), net.ParseIP("127.0.0.1"), 10251, true) - }, 0) + standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251) return apiServer.URL } diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index de966ac771a..68423621aa7 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -24,28 +24,15 @@ import ( "flag" "math/rand" "net" - "net/http" - "os" - "os/exec" - "path" - "strings" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" - "github.com/GoogleCloudPlatform/kubernetes/pkg/health" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" - kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" - "github.com/coreos/go-etcd/etcd" - "github.com/fsouza/go-dockerclient" "github.com/golang/glog" - cadvisor "github.com/google/cadvisor/client" ) const defaultRootDir = "/var/lib/kubelet" @@ -82,52 +69,15 @@ func init() { flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.") } -func getDockerEndpoint() string { - var endpoint string - if len(*dockerEndpoint) > 0 { - endpoint = *dockerEndpoint - } else if len(os.Getenv("DOCKER_HOST")) > 0 { - endpoint = os.Getenv("DOCKER_HOST") - } else { - endpoint = "unix:///var/run/docker.sock" - } - glog.Infof("Connecting to docker on %s", endpoint) - - return endpoint -} - -func getHostname() string { - hostname := []byte(*hostnameOverride) - if string(hostname) == "" { - // Note: We use exec here instead of os.Hostname() because we - // want the FQDN, and this is the easiest way to get it. - fqdn, err := exec.Command("hostname", "-f").Output() - if err != nil { - glog.Fatalf("Couldn't determine hostname: %v", err) +func setupRunOnce() { + if *runonce { + if len(etcdServerList) > 0 { + glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive") + } + if *enableServer { + glog.Infof("--runonce is set, disabling server") + *enableServer = false } - hostname = fqdn - } - return strings.TrimSpace(string(hostname)) -} - -func getApiserverClient() (*client.Client, error) { - authInfo, err := clientauth.LoadFromFile(*authPath) - if err != nil { - return nil, err - } - clientConfig, err := authInfo.MergeWithConfig(client.Config{}) - if err != nil { - return nil, err - } - // TODO: adapt Kube client to support LB over several servers - if len(apiServerList) > 1 { - glog.Infof("Mulitple api servers specified. Picking first one") - } - clientConfig.Host = apiServerList[0] - if c, err := client.New(&clientConfig); err != nil { - return nil, err - } else { - return c, nil } } @@ -139,147 +89,34 @@ func main() { verflag.PrintAndExitIfRequested() - if *runonce { - exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive" - if len(etcdServerList) > 0 { - glog.Fatalf(exclusiveFlag, "--etcd_servers") - } - if *enableServer { - glog.Infof("--runonce is set, disabling server") - *enableServer = false - } - } - - etcd.SetLogger(util.NewLogger("etcd ")) - - // Make an API client if possible. - if len(apiServerList) < 1 { - glog.Info("No api servers specified.") - } else { - if apiClient, err := getApiserverClient(); err != nil { - glog.Errorf("Unable to make apiserver client: %v", err) - } else { - // Send events to APIserver if there is a client. - glog.Infof("Sending events to APIserver.") - record.StartRecording(apiClient.Events(""), "kubelet") - } - } - - // Log the events locally too. - record.StartLogging(glog.Infof) - - capabilities.Initialize(capabilities.Capabilities{ - AllowPrivileged: *allowPrivileged, - }) - - dockerClient, err := docker.NewClient(getDockerEndpoint()) - if err != nil { - glog.Fatal("Couldn't connect to docker.") - } - - hostname := getHostname() - - if *rootDirectory == "" { - glog.Fatal("Invalid root directory path.") - } - *rootDirectory = path.Clean(*rootDirectory) - if err := os.MkdirAll(*rootDirectory, 0750); err != nil { - glog.Fatalf("Error creating root directory: %v", err) - } - - // source of all configuration - cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) - - // define file config source - if *config != "" { - kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file")) - } - - // define url config source - if *manifestURL != "" { - kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http")) - } - - // define etcd config source and initialize etcd client - var etcdClient *etcd.Client - if len(etcdServerList) > 0 { - etcdClient = etcd.NewClient(etcdServerList) - } else if *etcdConfigFile != "" { - var err error - etcdClient, err = etcd.NewClientFromFile(*etcdConfigFile) - if err != nil { - glog.Fatalf("Error with etcd config file: %v", err) - } - } - - if etcdClient != nil { - glog.Infof("Watching for etcd configs at %v", etcdClient.GetCluster()) - kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd")) - } - - // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop - // up into "per source" synchronizations - - k := kubelet.NewMainKubelet( - getHostname(), - dockerClient, - etcdClient, - *rootDirectory, - *networkContainerImage, - *syncFrequency, - float32(*registryPullQPS), - *registryBurst, - *minimumGCAge, - *maxContainerCount) - - k.BirthCry() - - go func() { - util.Forever(func() { - err := k.GarbageCollectContainers() - if err != nil { - glog.Errorf("Garbage collect failed: %v", err) - } - }, time.Minute*1) - }() - - go func() { - defer util.HandleCrash() - // TODO: Monitor this connection, reconnect if needed? - glog.V(1).Infof("Trying to create cadvisor client.") - cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:4194") - if err != nil { - glog.Errorf("Error on creating cadvisor client: %v", err) - return - } - glog.V(1).Infof("Successfully created cadvisor client.") - k.SetCadvisorClient(cadvisorClient) - }() - - // TODO: These should probably become more plugin-ish: register a factory func - // in each checker's init(), iterate those here. - health.AddHealthChecker(health.NewExecHealthChecker(k)) - health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{})) - health.AddHealthChecker(&health.TCPHealthChecker{}) - - // process pods and exit. - if *runonce { - if _, err := k.RunOnce(cfg.Updates()); err != nil { - glog.Fatalf("--runonce failed: %v", err) - } - return - } - - // start the kubelet - go util.Forever(func() { k.Run(cfg.Updates()) }, 0) - - // start the kubelet server - if *enableServer { - go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(address), *port, *enableDebuggingHandlers) - }, 0) + setupRunOnce() + + kcfg := standalone.KubeletConfig{ + Address: address, + AuthPath: *authPath, + ApiServerList: apiServerList, + AllowPrivileged: *allowPrivileged, + HostnameOverride: *hostnameOverride, + RootDirectory: *rootDirectory, + ConfigFile: *config, + ManifestURL: *manifestURL, + FileCheckFrequency: *fileCheckFrequency, + HttpCheckFrequency: *httpCheckFrequency, + NetworkContainerImage: *networkContainerImage, + SyncFrequency: *syncFrequency, + RegistryPullQPS: *registryPullQPS, + RegistryBurst: *registryBurst, + MinimumGCAge: *minimumGCAge, + MaxContainerCount: *maxContainerCount, + Runonce: *runonce, + Port: *port, + EnableServer: *enableServer, + EnableDebuggingHandlers: *enableDebuggingHandlers, + DockerClient: kubelet.ConnectToDockerOrDie(*dockerEndpoint), + EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile), } + standalone.RunKubelet(&kcfg) // runs forever select {} } diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 0e7c83f3a97..bd7400b4ef7 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -50,7 +51,9 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string standalone.RunApiServer(cl, etcdClient, addr, port) standalone.RunScheduler(cl) standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) - standalone.RunKubelet(etcdClient, machineList[0], *dockerEndpoint) + + dockerClient := kubelet.ConnectToDockerOrDie(*dockerEndpoint) + standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250) } func newApiClient(addr string, port int) *client.Client { diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go new file mode 100644 index 00000000000..2cfe9bab640 --- /dev/null +++ b/pkg/kubelet/util.go @@ -0,0 +1,187 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "net/http" + "os" + "os/exec" + "path" + "strings" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" + "github.com/GoogleCloudPlatform/kubernetes/pkg/health" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/coreos/go-etcd/etcd" + "github.com/fsouza/go-dockerclient" + "github.com/golang/glog" + cadvisor "github.com/google/cadvisor/client" +) + +// TODO: move this into a pkg/util +func GetHostname(hostnameOverride string) string { + hostname := []byte(hostnameOverride) + if string(hostname) == "" { + // Note: We use exec here instead of os.Hostname() because we + // want the FQDN, and this is the easiest way to get it. + fqdn, err := exec.Command("hostname", "-f").Output() + if err != nil { + glog.Fatalf("Couldn't determine hostname: %v", err) + } + hostname = fqdn + } + return strings.TrimSpace(string(hostname)) +} + +// TODO: move this into a pkg/util +func GetDockerEndpoint(dockerEndpoint string) string { + var endpoint string + if len(dockerEndpoint) > 0 { + endpoint = dockerEndpoint + } else if len(os.Getenv("DOCKER_HOST")) > 0 { + endpoint = os.Getenv("DOCKER_HOST") + } else { + endpoint = "unix:///var/run/docker.sock" + } + glog.Infof("Connecting to docker on %s", endpoint) + + return endpoint +} + +// TODO: move this into pkg/util +func ConnectToDockerOrDie(dockerEndpoint string) *docker.Client { + client, err := docker.NewClient(GetDockerEndpoint(dockerEndpoint)) + if err != nil { + glog.Fatal("Couldn't connect to docker.") + } + return client +} + +// TODO: move this into the kubelet itself +func GarbageCollectLoop(k *Kubelet) { + func() { + util.Forever(func() { + err := k.GarbageCollectContainers() + if err != nil { + glog.Errorf("Garbage collect failed: %v", err) + } + }, time.Minute*1) + }() +} + +// TODO: move this into the kubelet itself +func MonitorCAdvisor(k *Kubelet) { + defer util.HandleCrash() + // TODO: Monitor this connection, reconnect if needed? + glog.V(1).Infof("Trying to create cadvisor client.") + cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:4194") + if err != nil { + glog.Errorf("Error on creating cadvisor client: %v", err) + return + } + glog.V(1).Infof("Successfully created cadvisor client.") + k.SetCadvisorClient(cadvisorClient) +} + +// TODO: move this into the kubelet itself +func InitHealthChecking(k *Kubelet) { + // TODO: These should probably become more plugin-ish: register a factory func + // in each checker's init(), iterate those here. + health.AddHealthChecker(health.NewExecHealthChecker(k)) + health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{})) + health.AddHealthChecker(&health.TCPHealthChecker{}) +} + +// TODO: move this into a pkg/tools/etcd_tools +func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) *etcd.Client { + if len(etcdServerList) > 0 { + return etcd.NewClient(etcdServerList) + } else if etcdConfigFile != "" { + etcdClient, err := etcd.NewClientFromFile(etcdConfigFile) + if err != nil { + glog.Fatalf("Error with etcd config file: %v", err) + } + return etcdClient + } + return nil +} + +// TODO: move this into pkg/util +func SetupRootDirectoryOrDie(rootDirectory string) { + if rootDirectory == "" { + glog.Fatal("Invalid root directory path.") + } + rootDirectory = path.Clean(rootDirectory) + if err := os.MkdirAll(rootDirectory, 0750); err != nil { + glog.Fatalf("Error creating root directory: %v", err) + } +} + +// TODO: move this into pkg/capabilities +func SetupCapabilities(allowPrivileged bool) { + capabilities.Initialize(capabilities.Capabilities{ + AllowPrivileged: allowPrivileged, + }) +} + +// TODO: Split this up? +func SetupLogging() { + etcd.SetLogger(util.NewLogger("etcd ")) + // Log the events locally too. + record.StartLogging(glog.Infof) +} + +// TODO: move this into pkg/client +func getApiserverClient(authPath string, apiServerList util.StringList) (*client.Client, error) { + authInfo, err := clientauth.LoadFromFile(authPath) + if err != nil { + return nil, err + } + clientConfig, err := authInfo.MergeWithConfig(client.Config{}) + if err != nil { + return nil, err + } + // TODO: adapt Kube client to support LB over several servers + if len(apiServerList) > 1 { + glog.Infof("Mulitple api servers specified. Picking first one") + } + clientConfig.Host = apiServerList[0] + if c, err := client.New(&clientConfig); err != nil { + return nil, err + } else { + return c, nil + } +} + +func SetupEventSending(authPath string, apiServerList util.StringList) { + // Make an API client if possible. + if len(apiServerList) < 1 { + glog.Info("No api servers specified.") + } else { + if apiClient, err := getApiserverClient(authPath, apiServerList); err != nil { + glog.Errorf("Unable to make apiserver client: %v", err) + } else { + // Send events to APIserver if there is a client. + glog.Infof("Sending events to APIserver.") + record.StartRecording(apiClient.Events(""), "kubelet") + } + } +} diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 7eb28d10ec5..7c3b09c9369 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" @@ -39,12 +40,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" - docker "github.com/fsouza/go-dockerclient" "github.com/golang/glog" ) -const testRootDir = "/tmp/kubelet" - type delegateHandler struct { delegate http.Handler } @@ -141,20 +139,131 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, controllerManager.Run(10 * time.Second) } -// RunKubelet starts a Kubelet talking to dockerEndpoint -func RunKubelet(etcdClient tools.EtcdClient, hostname, dockerEndpoint string) { - dockerClient, err := docker.NewClient(GetDockerEndpoint(dockerEndpoint)) - if err != nil { - glog.Fatal("Couldn't connect to docker.") +// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. +// Under the hood it calls RunKubelet (below) +func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) { + kcfg := KubeletConfig{ + EtcdClient: etcdClient, + DockerClient: dockerClient, + HostnameOverride: hostname, + RootDirectory: rootDir, + ManifestURL: manifestURL, + NetworkContainerImage: kubelet.NetworkContainerImage, + Port: port, + Address: util.IP(net.ParseIP(address)), + EnableServer: true, + EnableDebuggingHandlers: true, + } + RunKubelet(&kcfg) +} + +// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications: +// 1 Integration tests +// 2 Kubelet binary +// 3 Standalone 'kubernetes' binary +// Eventually, #2 will be replaced with instances of #3 +func RunKubelet(kcfg *KubeletConfig) { + kubelet.SetupEventSending(kcfg.AuthPath, kcfg.ApiServerList) + kubelet.SetupLogging() + kubelet.SetupCapabilities(kcfg.AllowPrivileged) + + kcfg.Hostname = kubelet.GetHostname(kcfg.HostnameOverride) + if len(kcfg.RootDirectory) > 0 { + kubelet.SetupRootDirectoryOrDie(kcfg.RootDirectory) } - // Kubelet (localhost) - os.MkdirAll(testRootDir, 0750) - cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) - config.NewSourceEtcd(config.EtcdKeyForHost(hostname), etcdClient, cfg1.Channel("etcd")) - myKubelet := kubelet.NewIntegrationTestKubelet(hostname, testRootDir, dockerClient) - go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) - go util.Forever(func() { - kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250, true) - }, 0) + cfg := makePodSourceConfig(kcfg) + k := createAndInitKubelet(kcfg) + // process pods and exit. + if kcfg.Runonce { + if _, err := k.RunOnce(cfg.Updates()); err != nil { + glog.Errorf("--runonce failed: %v", err) + } + } else { + startKubelet(k, cfg, kcfg) + } +} + +func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig) { + // start the kubelet + go util.Forever(func() { k.Run(cfg.Updates()) }, 0) + + // start the kubelet server + if kc.EnableServer { + go util.Forever(func() { + kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers) + }, 0) + } +} + +func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { + // source of all configuration + cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) + + // define file config source + if kc.ConfigFile != "" { + config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel("file")) + } + + // define url config source + if kc.ManifestURL != "" { + config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel("http")) + } + + if kc.EtcdClient != nil { + glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) + config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel("etcd")) + } + return cfg +} + +type KubeletConfig struct { + EtcdClient tools.EtcdClient + DockerClient dockertools.DockerInterface + Address util.IP + AuthPath string + ApiServerList util.StringList + AllowPrivileged bool + HostnameOverride string + RootDirectory string + ConfigFile string + ManifestURL string + FileCheckFrequency time.Duration + HttpCheckFrequency time.Duration + Hostname string + NetworkContainerImage string + SyncFrequency time.Duration + RegistryPullQPS float64 + RegistryBurst int + MinimumGCAge time.Duration + MaxContainerCount int + EnableServer bool + EnableDebuggingHandlers bool + Port uint + Runonce bool +} + +func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet { + // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop + // up into "per source" synchronizations + + k := kubelet.NewMainKubelet( + kc.Hostname, + kc.DockerClient, + kc.EtcdClient, + kc.RootDirectory, + kc.NetworkContainerImage, + kc.SyncFrequency, + float32(kc.RegistryPullQPS), + kc.RegistryBurst, + kc.MinimumGCAge, + kc.MaxContainerCount) + + k.BirthCry() + + go kubelet.GarbageCollectLoop(k) + go kubelet.MonitorCAdvisor(k) + kubelet.InitHealthChecking(k) + + return k }