From ded67ead1ee73d48ce761e3aeac870ea48a9d407 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Tue, 22 Jul 2014 14:40:59 -0700 Subject: [PATCH] Make Kubelet type members private and provide New functions. --- cmd/integration/integration.go | 12 +---- cmd/kubelet/kubelet.go | 18 ++++--- pkg/kubelet/kubelet.go | 93 ++++++++++++++++++++-------------- pkg/kubelet/kubelet_test.go | 24 ++++----- 4 files changed, 79 insertions(+), 68 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index b9baba78e3b..2181d1e13d9 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -104,11 +104,7 @@ func startComponents(manifestURL string) (apiServerURL string) { cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd")) config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) - myKubelet := &kubelet.Kubelet{ - Hostname: machineList[0], - DockerClient: &fakeDocker1, - DockerPuller: &kubelet.FakeDockerPuller{}, - } + myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) go util.Forever(cfg1.Sync, 3*time.Second) go util.Forever(func() { @@ -120,11 +116,7 @@ func startComponents(manifestURL string) (apiServerURL string) { // have a place they can schedule. cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd")) - otherKubelet := &kubelet.Kubelet{ - Hostname: machineList[1], - DockerClient: &fakeDocker2, - DockerPuller: &kubelet.FakeDockerPuller{}, - } + otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) go util.Forever(cfg2.Sync, 3*time.Second) go util.Forever(func() { diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 45c918ebc33..086c30771a0 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -32,6 +32,7 @@ import ( _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" @@ -104,12 +105,6 @@ func main() { hostname := getHostname() - k := &kubelet.Kubelet{ - Hostname: hostname, - DockerClient: dockerClient, - CadvisorClient: cadvisorClient, - } - // source of all configuration cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) @@ -124,15 +119,22 @@ func main() { } // define etcd config source and initialize etcd client + var etcdClient tools.EtcdClient if len(etcdServerList) > 0 { glog.Infof("Watching for etcd configs at %v", etcdServerList) - k.EtcdClient = etcd.NewClient(etcdServerList) - kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd")) + etcdClient = etcd.NewClient(etcdServerList) + kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd")) } // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations + k := kubelet.NewMainKubelet( + getHostname(), + dockerClient, + cadvisorClient, + etcdClient) + // start the kubelet go util.Forever(func() { k.Run(cfg.Updates()) }, 0) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a332c4194f9..70818eb5ed8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -57,47 +57,64 @@ type SyncHandler interface { type volumeMap map[string]volume.Interface -// New creates a new Kubelet. -// TODO: currently it is only called by test code. -// Need cleanup. -func New() *Kubelet { - return &Kubelet{} +// New creates a new Kubelet for use in main +func NewMainKubelet( + hn string, + dc DockerInterface, + cc CadvisorInterface, + ec tools.EtcdClient) *Kubelet { + return &Kubelet{ + hostname: hn, + dockerClient: dc, + cadvisorClient: cc, + etcdClient: ec, + } +} + +// NewIntegrationTestKubelet creates a new Kubelet for use in integration tests. +// TODO: add more integration tests, and expand parameter list as needed. +func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet { + return &Kubelet{ + hostname: hn, + dockerClient: dc, + dockerPuller: &FakeDockerPuller{}, + } } // Kubelet is the main kubelet implementation. type Kubelet struct { - Hostname string - DockerClient DockerInterface + hostname string + dockerClient DockerInterface // Optional, no events will be sent without it - EtcdClient tools.EtcdClient + etcdClient tools.EtcdClient // Optional, no statistics will be available if omitted - CadvisorClient CadvisorInterface + cadvisorClient CadvisorInterface // Optional, defaults to simple implementaiton - HealthChecker health.HealthChecker + healthChecker health.HealthChecker // Optional, defaults to simple Docker implementation - DockerPuller DockerPuller + dockerPuller DockerPuller // Optional, defaults to /logs/ from /var/log - LogServer http.Handler + logServer http.Handler } // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan PodUpdate) { - if kl.LogServer == nil { - kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) + if kl.logServer == nil { + kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } - if kl.DockerPuller == nil { - kl.DockerPuller = NewDockerPuller(kl.DockerClient) + if kl.dockerPuller == nil { + kl.dockerPuller = NewDockerPuller(kl.dockerClient) } - if kl.HealthChecker == nil { - kl.HealthChecker = health.NewHealthChecker() + if kl.healthChecker == nil { + kl.healthChecker = health.NewHealthChecker() } kl.syncLoop(updates, kl) } // LogEvent logs an event to the etcd backend. func (kl *Kubelet) LogEvent(event *api.Event) error { - if kl.EtcdClient == nil { + if kl.etcdClient == nil { return fmt.Errorf("no etcd client connection") } event.Timestamp = time.Now().Unix() @@ -107,7 +124,7 @@ func (kl *Kubelet) LogEvent(event *api.Event) error { } var response *etcd.Response - response, err = kl.EtcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */) + response, err = kl.etcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */) // TODO(bburns) : examine response here. if err != nil { glog.Errorf("Error writing event: %s\n", err) @@ -228,11 +245,11 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v WorkingDir: container.WorkingDir, }, } - dockerContainer, err := kl.DockerClient.CreateContainer(opts) + dockerContainer, err := kl.dockerClient.CreateContainer(opts) if err != nil { return "", err } - err = kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{ + err = kl.dockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{ PortBindings: portBindings, Binds: binds, NetworkMode: netMode, @@ -242,7 +259,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v // Kill a docker container func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { - err := kl.DockerClient.StopContainer(dockerContainer.ID, 10) + err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) podFullName, containerName := parseDockerName(dockerContainer.Names[0]) kl.LogEvent(&api.Event{ Event: "STOP", @@ -276,7 +293,7 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) { Image: networkContainerImage, Ports: ports, } - kl.DockerPuller.Pull(networkContainerImage) + kl.dockerPuller.Pull(networkContainerImage) return kl.runContainer(pod, container, nil, "") } @@ -327,7 +344,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan } glog.Infof("Container doesn't exist, creating %#v", container) - if err := kl.DockerPuller.Pull(container.Image); err != nil { + if err := kl.dockerPuller.Pull(container.Image); err != nil { glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name) continue } @@ -346,13 +363,13 @@ type empty struct{} // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(pods []Pod) error { - glog.Infof("Desired [%s]: %+v", kl.Hostname, pods) + glog.Infof("Desired [%s]: %+v", kl.hostname, pods) var err error dockerIdsToKeep := map[DockerID]empty{} keepChannel := make(chan DockerID, defaultChanSize) waitGroup := sync.WaitGroup{} - dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) + dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) if err != nil { glog.Errorf("Error listing containers %#v", dockerContainers) return err @@ -386,7 +403,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { <-ch // Kill any containers we don't need - existingContainers, err := getKubeletDockerContainers(kl.DockerClient) + existingContainers, err := getKubeletDockerContainers(kl.dockerClient) if err != nil { glog.Errorf("Error listing containers: %v", err) return err @@ -432,12 +449,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { case u := <-updates: switch u.Op { case SET: - glog.Infof("Containers changed [%s]", kl.Hostname) + glog.Infof("Containers changed [%s]", kl.hostname) pods = u.Pods case UPDATE: //TODO: implement updates of containers - glog.Infof("Containers updated, not implemented [%s]", kl.Hostname) + glog.Infof("Containers updated, not implemented [%s]", kl.hostname) continue default: @@ -468,7 +485,7 @@ func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.Conta // cgroup file system. e.g. The root container, which represents the whole // machine, has path "/"; all docker containers have path "/docker/" func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - cinfo, err := kl.CadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req)) + cinfo, err := kl.cadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req)) if err != nil { return nil, err } @@ -477,15 +494,15 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai // GetPodInfo returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) { - return getDockerPodInfo(kl.DockerClient, podFullName) + return getDockerPodInfo(kl.dockerClient, podFullName) } // GetContainerInfo returns stats (from Cadvisor) for a container. func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { - if kl.CadvisorClient == nil { + if kl.cadvisorClient == nil { return nil, nil } - dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) + dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) if err != nil { return nil, err } @@ -502,7 +519,7 @@ func (kl *Kubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerI } func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) { - return kl.CadvisorClient.MachineInfo() + return kl.cadvisorClient.MachineInfo() } func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { @@ -513,14 +530,14 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { return health.Healthy, nil } - if kl.HealthChecker == nil { + if kl.healthChecker == nil { return health.Healthy, nil } - return kl.HealthChecker.HealthCheck(container) + return kl.healthChecker.HealthCheck(container) } // Returns logs of current machine. func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { // TODO: whitelist logs we are willing to serve - kl.LogServer.ServeHTTP(w, req) + kl.logServer.ServeHTTP(w, req) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 75a2c739d6a..7388f57a945 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -57,10 +57,10 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker err: nil, } - kubelet := New() - kubelet.DockerClient = fakeDocker - kubelet.DockerPuller = &FakeDockerPuller{} - kubelet.EtcdClient = fakeEtcdClient + kubelet := &Kubelet{} + kubelet.dockerClient = fakeDocker + kubelet.dockerPuller = &FakeDockerPuller{} + kubelet.etcdClient = fakeEtcdClient return kubelet, fakeEtcdClient, fakeDocker } @@ -160,7 +160,7 @@ func TestKillContainerWithError(t *testing.T) { }, } kubelet, _, _ := makeTestKubelet(t) - kubelet.DockerClient = fakeDocker + kubelet.dockerClient = fakeDocker err := kubelet.killContainer(fakeDocker.containerList[0]) verifyError(t, err) verifyCalls(t, fakeDocker, []string{"stop"}) @@ -289,7 +289,7 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status func TestSyncPodsUnhealthy(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) - kubelet.HealthChecker = &FalseHealthChecker{} + kubelet.healthChecker = &FalseHealthChecker{} fakeDocker.containerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container @@ -639,7 +639,7 @@ func TestGetContainerInfo(t *testing.T) { mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) kubelet, _, fakeDocker := makeTestKubelet(t) - kubelet.CadvisorClient = mockCadvisor + kubelet.cadvisorClient = mockCadvisor fakeDocker.containerList = []docker.APIContainers{ { ID: containerID, @@ -689,9 +689,9 @@ func TestGetRooInfo(t *testing.T) { mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) kubelet := Kubelet{ - DockerClient: &fakeDocker, - DockerPuller: &FakeDockerPuller{}, - CadvisorClient: mockCadvisor, + dockerClient: &fakeDocker, + dockerPuller: &FakeDockerPuller{}, + cadvisorClient: mockCadvisor, } // If the container name is an empty string, then it means the root container. @@ -746,7 +746,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) { mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr) kubelet, _, fakeDocker := makeTestKubelet(t) - kubelet.CadvisorClient = mockCadvisor + kubelet.cadvisorClient = mockCadvisor fakeDocker.containerList = []docker.APIContainers{ { ID: containerID, @@ -774,7 +774,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) { mockCadvisor := &mockCadvisorClient{} kubelet, _, fakeDocker := makeTestKubelet(t) - kubelet.CadvisorClient = mockCadvisor + kubelet.cadvisorClient = mockCadvisor fakeDocker.containerList = []docker.APIContainers{} stats, _ := kubelet.GetContainerInfo("qux", "foo", nil)