Make Kubelet type members private and provide New functions.

This commit is contained in:
Eric Tune 2014-07-22 14:40:59 -07:00
parent 2a2fdb4773
commit ded67ead1e
4 changed files with 79 additions and 68 deletions

View File

@ -104,11 +104,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd")) config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := &kubelet.Kubelet{ myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(cfg1.Sync, 3*time.Second) go util.Forever(cfg1.Sync, 3*time.Second)
go util.Forever(func() { go util.Forever(func() {
@ -120,11 +116,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// have a place they can schedule. // have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd")) config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
otherKubelet := &kubelet.Kubelet{ otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(cfg2.Sync, 3*time.Second) go util.Forever(cfg2.Sync, 3*time.Second)
go util.Forever(func() { go util.Forever(func() {

View File

@ -32,6 +32,7 @@ import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config" kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
@ -104,12 +105,6 @@ func main() {
hostname := getHostname() hostname := getHostname()
k := &kubelet.Kubelet{
Hostname: hostname,
DockerClient: dockerClient,
CadvisorClient: cadvisorClient,
}
// source of all configuration // source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)
@ -124,15 +119,22 @@ func main() {
} }
// define etcd config source and initialize etcd client // define etcd config source and initialize etcd client
var etcdClient tools.EtcdClient
if len(etcdServerList) > 0 { if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList) glog.Infof("Watching for etcd configs at %v", etcdServerList)
k.EtcdClient = etcd.NewClient(etcdServerList) etcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd")) kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd"))
} }
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations // up into "per source" synchronizations
k := kubelet.NewMainKubelet(
getHostname(),
dockerClient,
cadvisorClient,
etcdClient)
// start the kubelet // start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0) go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

View File

@ -57,47 +57,64 @@ type SyncHandler interface {
type volumeMap map[string]volume.Interface type volumeMap map[string]volume.Interface
// New creates a new Kubelet. // New creates a new Kubelet for use in main
// TODO: currently it is only called by test code. func NewMainKubelet(
// Need cleanup. hn string,
func New() *Kubelet { dc DockerInterface,
return &Kubelet{} cc CadvisorInterface,
ec tools.EtcdClient) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
cadvisorClient: cc,
etcdClient: ec,
}
}
// NewIntegrationTestKubelet creates a new Kubelet for use in integration tests.
// TODO: add more integration tests, and expand parameter list as needed.
func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
dockerPuller: &FakeDockerPuller{},
}
} }
// Kubelet is the main kubelet implementation. // Kubelet is the main kubelet implementation.
type Kubelet struct { type Kubelet struct {
Hostname string hostname string
DockerClient DockerInterface dockerClient DockerInterface
// Optional, no events will be sent without it // Optional, no events will be sent without it
EtcdClient tools.EtcdClient etcdClient tools.EtcdClient
// Optional, no statistics will be available if omitted // Optional, no statistics will be available if omitted
CadvisorClient CadvisorInterface cadvisorClient CadvisorInterface
// Optional, defaults to simple implementaiton // Optional, defaults to simple implementaiton
HealthChecker health.HealthChecker healthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation // Optional, defaults to simple Docker implementation
DockerPuller DockerPuller dockerPuller DockerPuller
// Optional, defaults to /logs/ from /var/log // Optional, defaults to /logs/ from /var/log
LogServer http.Handler logServer http.Handler
} }
// Run starts the kubelet reacting to config updates // Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) { func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.LogServer == nil { if kl.logServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
} }
if kl.DockerPuller == nil { if kl.dockerPuller == nil {
kl.DockerPuller = NewDockerPuller(kl.DockerClient) kl.dockerPuller = NewDockerPuller(kl.dockerClient)
} }
if kl.HealthChecker == nil { if kl.healthChecker == nil {
kl.HealthChecker = health.NewHealthChecker() kl.healthChecker = health.NewHealthChecker()
} }
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
} }
// LogEvent logs an event to the etcd backend. // LogEvent logs an event to the etcd backend.
func (kl *Kubelet) LogEvent(event *api.Event) error { func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.EtcdClient == nil { if kl.etcdClient == nil {
return fmt.Errorf("no etcd client connection") return fmt.Errorf("no etcd client connection")
} }
event.Timestamp = time.Now().Unix() event.Timestamp = time.Now().Unix()
@ -107,7 +124,7 @@ func (kl *Kubelet) LogEvent(event *api.Event) error {
} }
var response *etcd.Response var response *etcd.Response
response, err = kl.EtcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */) response, err = kl.etcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
// TODO(bburns) : examine response here. // TODO(bburns) : examine response here.
if err != nil { if err != nil {
glog.Errorf("Error writing event: %s\n", err) glog.Errorf("Error writing event: %s\n", err)
@ -228,11 +245,11 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
WorkingDir: container.WorkingDir, WorkingDir: container.WorkingDir,
}, },
} }
dockerContainer, err := kl.DockerClient.CreateContainer(opts) dockerContainer, err := kl.dockerClient.CreateContainer(opts)
if err != nil { if err != nil {
return "", err return "", err
} }
err = kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{ err = kl.dockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
PortBindings: portBindings, PortBindings: portBindings,
Binds: binds, Binds: binds,
NetworkMode: netMode, NetworkMode: netMode,
@ -242,7 +259,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
// Kill a docker container // Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
err := kl.DockerClient.StopContainer(dockerContainer.ID, 10) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0]) podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{ kl.LogEvent(&api.Event{
Event: "STOP", Event: "STOP",
@ -276,7 +293,7 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
Image: networkContainerImage, Image: networkContainerImage,
Ports: ports, Ports: ports,
} }
kl.DockerPuller.Pull(networkContainerImage) kl.dockerPuller.Pull(networkContainerImage)
return kl.runContainer(pod, container, nil, "") return kl.runContainer(pod, container, nil, "")
} }
@ -327,7 +344,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
} }
glog.Infof("Container doesn't exist, creating %#v", container) glog.Infof("Container doesn't exist, creating %#v", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil { if err := kl.dockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name) glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name)
continue continue
} }
@ -346,13 +363,13 @@ type empty struct{}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state. // SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error { func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.Hostname, pods) glog.Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error var err error
dockerIdsToKeep := map[DockerID]empty{} dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize) keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{} waitGroup := sync.WaitGroup{}
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil { if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers) glog.Errorf("Error listing containers %#v", dockerContainers)
return err return err
@ -386,7 +403,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
<-ch <-ch
// Kill any containers we don't need // Kill any containers we don't need
existingContainers, err := getKubeletDockerContainers(kl.DockerClient) existingContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil { if err != nil {
glog.Errorf("Error listing containers: %v", err) glog.Errorf("Error listing containers: %v", err)
return err return err
@ -432,12 +449,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
case u := <-updates: case u := <-updates:
switch u.Op { switch u.Op {
case SET: case SET:
glog.Infof("Containers changed [%s]", kl.Hostname) glog.Infof("Containers changed [%s]", kl.hostname)
pods = u.Pods pods = u.Pods
case UPDATE: case UPDATE:
//TODO: implement updates of containers //TODO: implement updates of containers
glog.Infof("Containers updated, not implemented [%s]", kl.Hostname) glog.Infof("Containers updated, not implemented [%s]", kl.hostname)
continue continue
default: default:
@ -468,7 +485,7 @@ func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.Conta
// cgroup file system. e.g. The root container, which represents the whole // cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>" // machine, has path "/"; all docker containers have path "/docker/<docker id>"
func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cinfo, err := kl.CadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req)) cinfo, err := kl.cadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -477,15 +494,15 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
// GetPodInfo returns information from Docker about the containers in a pod // GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) { func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, podFullName) return getDockerPodInfo(kl.dockerClient, podFullName)
} }
// GetContainerInfo returns stats (from Cadvisor) for a container. // GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil { if kl.cadvisorClient == nil {
return nil, nil return nil, nil
} }
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -502,7 +519,7 @@ func (kl *Kubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerI
} }
func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) { func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) {
return kl.CadvisorClient.MachineInfo() return kl.cadvisorClient.MachineInfo()
} }
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
@ -513,14 +530,14 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return health.Healthy, nil return health.Healthy, nil
} }
if kl.HealthChecker == nil { if kl.healthChecker == nil {
return health.Healthy, nil return health.Healthy, nil
} }
return kl.HealthChecker.HealthCheck(container) return kl.healthChecker.HealthCheck(container)
} }
// Returns logs of current machine. // Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve // TODO: whitelist logs we are willing to serve
kl.LogServer.ServeHTTP(w, req) kl.logServer.ServeHTTP(w, req)
} }

View File

@ -57,10 +57,10 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
err: nil, err: nil,
} }
kubelet := New() kubelet := &Kubelet{}
kubelet.DockerClient = fakeDocker kubelet.dockerClient = fakeDocker
kubelet.DockerPuller = &FakeDockerPuller{} kubelet.dockerPuller = &FakeDockerPuller{}
kubelet.EtcdClient = fakeEtcdClient kubelet.etcdClient = fakeEtcdClient
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -160,7 +160,7 @@ func TestKillContainerWithError(t *testing.T) {
}, },
} }
kubelet, _, _ := makeTestKubelet(t) kubelet, _, _ := makeTestKubelet(t)
kubelet.DockerClient = fakeDocker kubelet.dockerClient = fakeDocker
err := kubelet.killContainer(fakeDocker.containerList[0]) err := kubelet.killContainer(fakeDocker.containerList[0])
verifyError(t, err) verifyError(t, err)
verifyCalls(t, fakeDocker, []string{"stop"}) verifyCalls(t, fakeDocker, []string{"stop"})
@ -289,7 +289,7 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status
func TestSyncPodsUnhealthy(t *testing.T) { func TestSyncPodsUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.HealthChecker = &FalseHealthChecker{} kubelet.healthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{ fakeDocker.containerList = []docker.APIContainers{
{ {
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@ -639,7 +639,7 @@ func TestGetContainerInfo(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{ fakeDocker.containerList = []docker.APIContainers{
{ {
ID: containerID, ID: containerID,
@ -689,9 +689,9 @@ func TestGetRooInfo(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil) mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet := Kubelet{ kubelet := Kubelet{
DockerClient: &fakeDocker, dockerClient: &fakeDocker,
DockerPuller: &FakeDockerPuller{}, dockerPuller: &FakeDockerPuller{},
CadvisorClient: mockCadvisor, cadvisorClient: mockCadvisor,
} }
// If the container name is an empty string, then it means the root container. // If the container name is an empty string, then it means the root container.
@ -746,7 +746,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr) mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr)
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{ fakeDocker.containerList = []docker.APIContainers{
{ {
ID: containerID, ID: containerID,
@ -774,7 +774,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
mockCadvisor := &mockCadvisorClient{} mockCadvisor := &mockCadvisorClient{}
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{} fakeDocker.containerList = []docker.APIContainers{}
stats, _ := kubelet.GetContainerInfo("qux", "foo", nil) stats, _ := kubelet.GetContainerInfo("qux", "foo", nil)