From fe4600ba17d37dba5743909104748d6730a0186e Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Tue, 28 Apr 2015 17:51:21 -0700 Subject: [PATCH] Move ComputePodChanges to DockerManager. This logic is specific to the Docker runtime. This move is the first step towards making syncPod() runtime-agnostic. --- pkg/kubelet/dockertools/docker.go | 9 -- pkg/kubelet/dockertools/docker_test.go | 5 +- pkg/kubelet/dockertools/manager.go | 176 +++++++++++++++++++++++- pkg/kubelet/handlers.go | 5 +- pkg/kubelet/kubelet.go | 181 ++----------------------- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/pod_workers_test.go | 3 +- pkg/kubelet/prober/prober.go | 23 ++-- pkg/kubelet/prober/prober_fake.go | 31 +++++ pkg/kubelet/runonce_test.go | 4 +- 10 files changed, 245 insertions(+), 195 deletions(-) create mode 100644 pkg/kubelet/prober/prober_fake.go diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 739e6758816..0a14d6ceb05 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -19,7 +19,6 @@ package dockertools import ( "fmt" "hash/adler32" - "io" "math/rand" "os" "strconv" @@ -27,7 +26,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" - kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -275,13 +273,6 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { return client } -// TODO(yifan): Move this to container.Runtime. -type ContainerCommandRunner interface { - RunInContainer(containerID string, cmd []string) ([]byte, error) - ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error - PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error -} - func milliCPUToShares(milliCPU int64) int64 { if milliCPU == 0 { // zero milliCPU means unset. Use kernel default. diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index be9d1ca610e..724e2481f27 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" + kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" docker "github.com/fsouza/go-dockerclient" @@ -394,7 +395,7 @@ func TestGetRunningContainers(t *testing.T) { fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) + containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) tests := []struct { containers map[string]*docker.Container inputIDs []string @@ -660,7 +661,7 @@ func TestFindContainersByPod(t *testing.T) { } fakeClient := &FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) + containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) for i, test := range tests { fakeClient.ContainerList = test.containerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 46e96630181..7fc469467ca 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -35,7 +35,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" @@ -86,6 +88,11 @@ type DockerManager struct { // Network plugin. networkPlugin network.NetworkPlugin + + // TODO(vmarmol): Make this non-public when we remove the circular dependency + // with prober. + // Health check prober. + Prober prober.Prober } func NewDockerManager( @@ -98,7 +105,8 @@ func NewDockerManager( burst int, containerLogsDir string, osInterface kubecontainer.OSInterface, - networkPlugin network.NetworkPlugin) *DockerManager { + networkPlugin network.NetworkPlugin, + prober prober.Prober) *DockerManager { // Work out the location of the Docker runtime, defaulting to /var/lib/docker // if there are any problems. dockerRoot := "/var/lib/docker" @@ -142,6 +150,7 @@ func NewDockerManager( dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, + Prober: prober, } } @@ -692,8 +701,8 @@ func (dm *DockerManager) IsImagePresent(image string) (bool, error) { return dm.Puller.IsImagePresent(image) } -// PodInfraContainer returns true if the pod infra container has changed. -func (dm *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { +// podInfraContainerChanged returns true if the pod infra container has changed. +func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { networkMode := "" var ports []api.ContainerPort @@ -1112,3 +1121,164 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon } return id, util.ApplyOomScoreAdj(containerInfo.State.Pid, podOomScoreAdj) } + +// TODO(vmarmol): This will soon be made non-public when its only use is internal. +// Structure keeping information on changes that need to happen for a pod. The semantics is as follows: +// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed. +// Additionally if it is true then containersToKeep have to be empty +// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container +// - containersToStart keeps indices of Specs of containers that have to be started. +// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that +// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1). +// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case +// Infra Container should be killed, hence it's removed from this map. +// - all running containers which are NOT contained in containersToKeep should be killed. +type empty struct{} +type PodContainerChangesSpec struct { + StartInfraContainer bool + InfraContainerId kubeletTypes.DockerID + ContainersToStart map[int]empty + ContainersToKeep map[kubeletTypes.DockerID]int +} + +// TODO(vmarmol): This will soon be made non-public when its only use is internal. +func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) { + podFullName := kubecontainer.GetPodFullName(pod) + uid := pod.UID + glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) + + containersToStart := make(map[int]empty) + containersToKeep := make(map[kubeletTypes.DockerID]int) + createPodInfraContainer := false + + var err error + var podInfraContainerID kubeletTypes.DockerID + var changed bool + podInfraContainer := runningPod.FindContainerByName(PodInfraContainerName) + if podInfraContainer != nil { + glog.V(4).Infof("Found pod infra container for %q", podFullName) + changed, err = dm.podInfraContainerChanged(pod, podInfraContainer) + if err != nil { + return PodContainerChangesSpec{}, err + } + } + + createPodInfraContainer = true + if podInfraContainer == nil { + glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName) + } else if changed { + glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName) + } else { + glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName) + createPodInfraContainer = false + podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID) + containersToKeep[podInfraContainerID] = -1 + } + + for index, container := range pod.Spec.Containers { + expectedHash := HashContainer(&container) + + c := runningPod.FindContainerByName(container.Name) + if c == nil { + if shouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) { + // If we are here it means that the container is dead and should be restarted, or never existed and should + // be created. We may be inserting this ID again if the container has changed and it has + // RestartPolicy::Always, but it's not a big deal. + glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) + containersToStart[index] = empty{} + } + continue + } + + containerID := kubeletTypes.DockerID(c.ID) + hash := c.Hash + glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) + + if createPodInfraContainer { + // createPodInfraContainer == true and Container exists + // If we're creating infra containere everything will be killed anyway + // If RestartPolicy is Always or OnFailure we restart containers that were running before we + // killed them when restarting Infra Container. + if pod.Spec.RestartPolicy != api.RestartPolicyNever { + glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name) + containersToStart[index] = empty{} + } + continue + } + + // At this point, the container is running and pod infra container is good. + // We will look for changes and check healthiness for the container. + containerChanged := hash != 0 && hash != expectedHash + if containerChanged { + glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash) + containersToStart[index] = empty{} + continue + } + + result, err := dm.Prober.Probe(pod, podStatus, container, string(c.ID), c.Created) + if err != nil { + // TODO(vmarmol): examine this logic. + glog.V(2).Infof("probe no-error: %q", container.Name) + containersToKeep[containerID] = index + continue + } + if result == probe.Success { + glog.V(4).Infof("probe success: %q", container.Name) + containersToKeep[containerID] = index + continue + } + glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) + containersToStart[index] = empty{} + } + + // After the loop one of the following should be true: + // - createPodInfraContainer is true and containersToKeep is empty. + // (In fact, when createPodInfraContainer is false, containersToKeep will not be touched). + // - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container + + // If Infra container is the last running one, we don't want to keep it. + if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 { + containersToKeep = make(map[kubeletTypes.DockerID]int) + } + + return PodContainerChangesSpec{ + StartInfraContainer: createPodInfraContainer, + InfraContainerId: podInfraContainerID, + ContainersToStart: containersToStart, + ContainersToKeep: containersToKeep, + }, nil +} + +func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool { + podFullName := kubecontainer.GetPodFullName(pod) + + // Get all dead container status. + var resultStatus []*api.ContainerStatus + for i, containerStatus := range podStatus.ContainerStatuses { + if containerStatus.Name == container.Name && containerStatus.State.Termination != nil { + resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i]) + } + } + + // Set dead containers to unready state. + for _, c := range resultStatus { + readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID)) + } + + // Check RestartPolicy for dead container. + if len(resultStatus) > 0 { + if pod.Spec.RestartPolicy == api.RestartPolicyNever { + glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName) + return false + } + if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure { + // Check the exit code of last run. Note: This assumes the result is sorted + // by the created time in reverse order. + if resultStatus[0].State.Termination.ExitCode == 0 { + glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName) + return false + } + } + } + return true +} diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index be8fe96399d..54e5bf6e940 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -24,18 +24,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) type handlerRunner struct { httpGetter httpGetter - commandRunner dockertools.ContainerCommandRunner + commandRunner prober.ContainerCommandRunner containerManager *dockertools.DockerManager } // TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface. -func newHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner { +func newHandlerRunner(httpGetter httpGetter, commandRunner prober.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner { return &handlerRunner{ httpGetter: httpGetter, commandRunner: commandRunner, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e57484a4fd0..8d3041915cb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -46,7 +46,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" @@ -251,7 +250,8 @@ func NewMainKubelet( pullBurst, containerLogsDir, osInterface, - klet.networkPlugin) + klet.networkPlugin, + nil) klet.runner = containerManager klet.containerManager = containerManager @@ -259,6 +259,9 @@ func NewMainKubelet( klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager) + // TODO(vmarmol): Remove when the circular dependency is removed :( + containerManager.Prober = klet.prober + runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager) if err != nil { return nil, err @@ -318,7 +321,7 @@ type Kubelet struct { // Optional, defaults to /logs/ from /var/log logServer http.Handler // Optional, defaults to simple Docker implementation - runner dockertools.ContainerCommandRunner + runner prober.ContainerCommandRunner // Optional, client for http requests, defaults to empty client httpClient httpGetter @@ -911,164 +914,6 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { return nil } -func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool { - podFullName := kubecontainer.GetPodFullName(pod) - - // Get all dead container status. - var resultStatus []*api.ContainerStatus - for i, containerStatus := range podStatus.ContainerStatuses { - if containerStatus.Name == container.Name && containerStatus.State.Termination != nil { - resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i]) - } - } - - // Set dead containers to unready state. - for _, c := range resultStatus { - readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID)) - } - - // Check RestartPolicy for dead container. - if len(resultStatus) > 0 { - if pod.Spec.RestartPolicy == api.RestartPolicyNever { - glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName) - return false - } - if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure { - // Check the exit code of last run. Note: This assumes the result is sorted - // by the created time in reverse order. - if resultStatus[0].State.Termination.ExitCode == 0 { - glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName) - return false - } - } - } - return true -} - -// Structure keeping information on changes that need to happen for a pod. The semantics is as follows: -// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed. -// Additionally if it is true then containersToKeep have to be empty -// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container -// - containersToStart keeps indices of Specs of containers that have to be started. -// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that -// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1). -// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case -// Infra Container should be killed, hence it's removed from this map. -// - all running containers which are NOT contained in containersToKeep should be killed. -type podContainerChangesSpec struct { - startInfraContainer bool - infraContainerId kubeletTypes.DockerID - containersToStart map[int]empty - containersToKeep map[kubeletTypes.DockerID]int -} - -func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (podContainerChangesSpec, error) { - podFullName := kubecontainer.GetPodFullName(pod) - uid := pod.UID - glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid) - - containersToStart := make(map[int]empty) - containersToKeep := make(map[kubeletTypes.DockerID]int) - createPodInfraContainer := false - - var err error - var podInfraContainerID kubeletTypes.DockerID - var changed bool - podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName) - if podInfraContainer != nil { - glog.V(4).Infof("Found pod infra container for %q", podFullName) - changed, err = kl.containerManager.PodInfraContainerChanged(pod, podInfraContainer) - if err != nil { - return podContainerChangesSpec{}, err - } - } - - createPodInfraContainer = true - if podInfraContainer == nil { - glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName) - } else if changed { - glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName) - } else { - glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName) - createPodInfraContainer = false - podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID) - containersToKeep[podInfraContainerID] = -1 - } - - for index, container := range pod.Spec.Containers { - expectedHash := dockertools.HashContainer(&container) - - c := runningPod.FindContainerByName(container.Name) - if c == nil { - if shouldContainerBeRestarted(&container, pod, &podStatus, kl.readinessManager) { - // If we are here it means that the container is dead and should be restarted, or never existed and should - // be created. We may be inserting this ID again if the container has changed and it has - // RestartPolicy::Always, but it's not a big deal. - glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) - containersToStart[index] = empty{} - } - continue - } - - containerID := kubeletTypes.DockerID(c.ID) - hash := c.Hash - glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID) - - if createPodInfraContainer { - // createPodInfraContainer == true and Container exists - // If we're creating infra containere everything will be killed anyway - // If RestartPolicy is Always or OnFailure we restart containers that were running before we - // killed them when restarting Infra Container. - if pod.Spec.RestartPolicy != api.RestartPolicyNever { - glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name) - containersToStart[index] = empty{} - } - continue - } - - // At this point, the container is running and pod infra container is good. - // We will look for changes and check healthiness for the container. - containerChanged := hash != 0 && hash != expectedHash - if containerChanged { - glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash) - containersToStart[index] = empty{} - continue - } - - result, err := kl.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) - if err != nil { - // TODO(vmarmol): examine this logic. - glog.V(2).Infof("probe no-error: %q", container.Name) - containersToKeep[containerID] = index - continue - } - if result == probe.Success { - glog.V(4).Infof("probe success: %q", container.Name) - containersToKeep[containerID] = index - continue - } - glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) - containersToStart[index] = empty{} - } - - // After the loop one of the following should be true: - // - createPodInfraContainer is true and containersToKeep is empty. - // (In fact, when createPodInfraContainer is false, containersToKeep will not be touched). - // - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container - - // If Infra container is the last running one, we don't want to keep it. - if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 { - containersToKeep = make(map[kubeletTypes.DockerID]int) - } - - return podContainerChangesSpec{ - startInfraContainer: createPodInfraContainer, - infraContainerId: podInfraContainerID, - containersToStart: containersToStart, - containersToKeep: containersToKeep, - }, nil -} - func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { podFullName := kubecontainer.GetPodFullName(pod) uid := pod.UID @@ -1110,14 +955,14 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont return err } - containerChanges, err := kl.computePodContainerChanges(pod, runningPod, podStatus) + containerChanges, err := kl.containerManager.ComputePodContainerChanges(pod, runningPod, podStatus) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if err != nil { return err } - if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) { - if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 { + if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { + if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) } else { glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) @@ -1131,7 +976,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } else { // Otherwise kill any containers in this pod which are not specified as ones to keep. for _, container := range runningPod.Containers { - _, keep := containerChanges.containersToKeep[kubeletTypes.DockerID(container.ID)] + _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] if !keep { glog.V(3).Infof("Killing unwanted container %+v", container) err = kl.containerManager.KillContainer(container.ID) @@ -1160,8 +1005,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont kl.volumeManager.SetVolumes(pod.UID, podVolumes) // If we should create infra container then we do it first. - podInfraContainerID := containerChanges.infraContainerId - if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) { + podInfraContainerID := containerChanges.InfraContainerId + if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { glog.V(4).Infof("Creating pod infra container for %q", podFullName) podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner) @@ -1176,7 +1021,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } // Start everything - for container := range containerChanges.containersToStart { + for container := range containerChanges.ContainersToStart { glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) containerSpec := &pod.Spec.Containers[container] if err := kl.pullImage(pod, containerSpec); err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8e0487871d8..a04981cf796 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -108,7 +108,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin) + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) kubelet.podWorkers = newPodWorkers( kubelet.runtimeCache, @@ -120,6 +120,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeRecorder) kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) + kubelet.containerManager.Prober = kubelet.prober kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager) kubelet.volumeManager = newVolumeManager() return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index d540ff38394..a2e2831a5c4 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -26,6 +26,7 @@ import ( kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" + kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) @@ -42,7 +43,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) + dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) lock := sync.Mutex{} diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index ddbd79fc87c..6652c35c9d6 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -18,13 +18,13 @@ package prober import ( "fmt" + "io" "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec" httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http" @@ -42,12 +42,19 @@ type Prober interface { Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) } +type ContainerCommandRunner interface { + RunInContainer(containerID string, cmd []string) ([]byte, error) + ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error + PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error +} + // Prober helps to check the liveness/readiness of a container. type prober struct { - exec execprobe.ExecProber - http httprobe.HTTPProber - tcp tcprobe.TCPProber - runner dockertools.ContainerCommandRunner + exec execprobe.ExecProber + http httprobe.HTTPProber + tcp tcprobe.TCPProber + // TODO(vmarmol): Remove when we remove the circular dependency to DockerManager. + Runner ContainerCommandRunner readinessManager *kubecontainer.ReadinessManager refManager *kubecontainer.RefManager @@ -57,7 +64,7 @@ type prober struct { // NewProber creates a Prober, it takes a command runner and // several container info managers. func New( - runner dockertools.ContainerCommandRunner, + runner ContainerCommandRunner, readinessManager *kubecontainer.ReadinessManager, refManager *kubecontainer.RefManager, recorder record.EventRecorder) Prober { @@ -66,7 +73,7 @@ func New( exec: execprobe.New(), http: httprobe.New(), tcp: tcprobe.New(), - runner: runner, + Runner: runner, readinessManager: readinessManager, refManager: refManager, @@ -249,7 +256,7 @@ type execInContainer struct { func (p *prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd { return execInContainer{func() ([]byte, error) { - return p.runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) + return p.Runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) }} } diff --git a/pkg/kubelet/prober/prober_fake.go b/pkg/kubelet/prober/prober_fake.go new file mode 100644 index 00000000000..b26f8a92ed6 --- /dev/null +++ b/pkg/kubelet/prober/prober_fake.go @@ -0,0 +1,31 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" +) + +var _ Prober = &FakeProber{} + +type FakeProber struct { +} + +func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { + return probe.Success, nil +} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 665156aae9c..6bcb39bb80d 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -28,6 +28,7 @@ import ( kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" + kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" docker "github.com/fsouza/go-dockerclient" cadvisorApi "github.com/google/cadvisor/info/v1" ) @@ -158,7 +159,8 @@ func TestRunOnce(t *testing.T) { 0, "", kubecontainer.FakeOS{}, - kb.networkPlugin) + kb.networkPlugin, + &kubeletProber.FakeProber{}) kb.containerManager.Puller = &dockertools.FakeDockerPuller{} pods := []*api.Pod{