Merge pull request #7480 from vmarmol/runtime-syncpod

Move ComputePodChanges to the Docker runtime
This commit is contained in:
Yu-Ju Hong 2015-04-29 16:05:20 -07:00
commit ba1140a54f
10 changed files with 245 additions and 195 deletions

View File

@ -19,7 +19,6 @@ package dockertools
import ( import (
"fmt" "fmt"
"hash/adler32" "hash/adler32"
"io"
"math/rand" "math/rand"
"os" "os"
"strconv" "strconv"
@ -27,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -275,13 +273,6 @@ func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface {
return client return client
} }
// TODO(yifan): Move this to container.Runtime.
type ContainerCommandRunner interface {
RunInContainer(containerID string, cmd []string) ([]byte, error)
ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error
}
func milliCPUToShares(milliCPU int64) int64 { func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 { if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default. // zero milliCPU means unset. Use kernel default.

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
@ -394,7 +395,7 @@ func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
tests := []struct { tests := []struct {
containers map[string]*docker.Container containers map[string]*docker.Container
inputIDs []string inputIDs []string
@ -660,7 +661,7 @@ func TestFindContainersByPod(t *testing.T) {
} }
fakeClient := &FakeDockerClient{} fakeClient := &FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
for i, test := range tests { for i, test := range tests {
fakeClient.ContainerList = test.containerList fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -35,7 +35,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
@ -86,6 +88,11 @@ type DockerManager struct {
// Network plugin. // Network plugin.
networkPlugin network.NetworkPlugin networkPlugin network.NetworkPlugin
// TODO(vmarmol): Make this non-public when we remove the circular dependency
// with prober.
// Health check prober.
Prober prober.Prober
} }
func NewDockerManager( func NewDockerManager(
@ -98,7 +105,8 @@ func NewDockerManager(
burst int, burst int,
containerLogsDir string, containerLogsDir string,
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin) *DockerManager { networkPlugin network.NetworkPlugin,
prober prober.Prober) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker // Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems. // if there are any problems.
dockerRoot := "/var/lib/docker" dockerRoot := "/var/lib/docker"
@ -142,6 +150,7 @@ func NewDockerManager(
dockerRoot: dockerRoot, dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir, containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin, networkPlugin: networkPlugin,
Prober: prober,
} }
} }
@ -692,8 +701,8 @@ func (dm *DockerManager) IsImagePresent(image string) (bool, error) {
return dm.Puller.IsImagePresent(image) return dm.Puller.IsImagePresent(image)
} }
// PodInfraContainer returns true if the pod infra container has changed. // podInfraContainerChanged returns true if the pod infra container has changed.
func (dm *DockerManager) PodInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) { func (dm *DockerManager) podInfraContainerChanged(pod *api.Pod, podInfraContainer *kubecontainer.Container) (bool, error) {
networkMode := "" networkMode := ""
var ports []api.ContainerPort var ports []api.ContainerPort
@ -1112,3 +1121,164 @@ func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecon
} }
return id, util.ApplyOomScoreAdj(containerInfo.State.Pid, podOomScoreAdj) return id, util.ApplyOomScoreAdj(containerInfo.State.Pid, podOomScoreAdj)
} }
// TODO(vmarmol): This will soon be made non-public when its only use is internal.
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
// Additionally if it is true then containersToKeep have to be empty
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
// - containersToStart keeps indices of Specs of containers that have to be started.
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
// Infra Container should be killed, hence it's removed from this map.
// - all running containers which are NOT contained in containersToKeep should be killed.
type empty struct{}
type PodContainerChangesSpec struct {
StartInfraContainer bool
InfraContainerId kubeletTypes.DockerID
ContainersToStart map[int]empty
ContainersToKeep map[kubeletTypes.DockerID]int
}
// TODO(vmarmol): This will soon be made non-public when its only use is internal.
func (dm *DockerManager) ComputePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (PodContainerChangesSpec, error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
containersToStart := make(map[int]empty)
containersToKeep := make(map[kubeletTypes.DockerID]int)
createPodInfraContainer := false
var err error
var podInfraContainerID kubeletTypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found pod infra container for %q", podFullName)
changed, err = dm.podInfraContainerChanged(pod, podInfraContainer)
if err != nil {
return PodContainerChangesSpec{}, err
}
}
createPodInfraContainer = true
if podInfraContainer == nil {
glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName)
} else if changed {
glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName)
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
}
for index, container := range pod.Spec.Containers {
expectedHash := HashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if shouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
containersToStart[index] = empty{}
}
continue
}
containerID := kubeletTypes.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
if createPodInfraContainer {
// createPodInfraContainer == true and Container exists
// If we're creating infra containere everything will be killed anyway
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
// killed them when restarting Infra Container.
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name)
containersToStart[index] = empty{}
}
continue
}
// At this point, the container is running and pod infra container is good.
// We will look for changes and check healthiness for the container.
containerChanged := hash != 0 && hash != expectedHash
if containerChanged {
glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
containersToStart[index] = empty{}
continue
}
result, err := dm.Prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.V(4).Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index
continue
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
containersToStart[index] = empty{}
}
// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty.
// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched).
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container
// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[kubeletTypes.DockerID]int)
}
return PodContainerChangesSpec{
StartInfraContainer: createPodInfraContainer,
InfraContainerId: podInfraContainerID,
ContainersToStart: containersToStart,
ContainersToKeep: containersToKeep,
}, nil
}
func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool {
podFullName := kubecontainer.GetPodFullName(pod)
// Get all dead container status.
var resultStatus []*api.ContainerStatus
for i, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == container.Name && containerStatus.State.Termination != nil {
resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i])
}
}
// Set dead containers to unready state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID))
}
// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure {
// Check the exit code of last run. Note: This assumes the result is sorted
// by the created time in reverse order.
if resultStatus[0].State.Termination.ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
}
}
return true
}

View File

@ -24,18 +24,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
type handlerRunner struct { type handlerRunner struct {
httpGetter httpGetter httpGetter httpGetter
commandRunner dockertools.ContainerCommandRunner commandRunner prober.ContainerCommandRunner
containerManager *dockertools.DockerManager containerManager *dockertools.DockerManager
} }
// TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface. // TODO(yifan): Merge commandRunner and containerManager once containerManager implements the ContainerCommandRunner interface.
func newHandlerRunner(httpGetter httpGetter, commandRunner dockertools.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner { func newHandlerRunner(httpGetter httpGetter, commandRunner prober.ContainerCommandRunner, containerManager *dockertools.DockerManager) kubecontainer.HandlerRunner {
return &handlerRunner{ return &handlerRunner{
httpGetter: httpGetter, httpGetter: httpGetter,
commandRunner: commandRunner, commandRunner: commandRunner,

View File

@ -46,7 +46,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types" kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -254,7 +253,8 @@ func NewMainKubelet(
pullBurst, pullBurst,
containerLogsDir, containerLogsDir,
osInterface, osInterface,
klet.networkPlugin) klet.networkPlugin,
nil)
klet.runner = containerManager klet.runner = containerManager
klet.containerManager = containerManager klet.containerManager = containerManager
@ -262,6 +262,9 @@ func NewMainKubelet(
klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) klet.prober = prober.New(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager) klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
// TODO(vmarmol): Remove when the circular dependency is removed :(
containerManager.Prober = klet.prober
runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager) runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
if err != nil { if err != nil {
return nil, err return nil, err
@ -321,7 +324,7 @@ type Kubelet struct {
// Optional, defaults to /logs/ from /var/log // Optional, defaults to /logs/ from /var/log
logServer http.Handler logServer http.Handler
// Optional, defaults to simple Docker implementation // Optional, defaults to simple Docker implementation
runner dockertools.ContainerCommandRunner runner prober.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client // Optional, client for http requests, defaults to empty client
httpClient httpGetter httpClient httpGetter
@ -924,164 +927,6 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil return nil
} }
func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *kubecontainer.ReadinessManager) bool {
podFullName := kubecontainer.GetPodFullName(pod)
// Get all dead container status.
var resultStatus []*api.ContainerStatus
for i, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == container.Name && containerStatus.State.Termination != nil {
resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i])
}
}
// Set dead containers to unready state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(kubecontainer.TrimRuntimePrefixFromImage(c.ContainerID))
}
// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure {
// Check the exit code of last run. Note: This assumes the result is sorted
// by the created time in reverse order.
if resultStatus[0].State.Termination.ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
}
}
return true
}
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
// Additionally if it is true then containersToKeep have to be empty
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
// - containersToStart keeps indices of Specs of containers that have to be started.
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
// Infra Container should be killed, hence it's removed from this map.
// - all running containers which are NOT contained in containersToKeep should be killed.
type podContainerChangesSpec struct {
startInfraContainer bool
infraContainerId kubeletTypes.DockerID
containersToStart map[int]empty
containersToKeep map[kubeletTypes.DockerID]int
}
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (podContainerChangesSpec, error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
containersToStart := make(map[int]empty)
containersToKeep := make(map[kubeletTypes.DockerID]int)
createPodInfraContainer := false
var err error
var podInfraContainerID kubeletTypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found pod infra container for %q", podFullName)
changed, err = kl.containerManager.PodInfraContainerChanged(pod, podInfraContainer)
if err != nil {
return podContainerChangesSpec{}, err
}
}
createPodInfraContainer = true
if podInfraContainer == nil {
glog.V(2).Infof("Need to restart pod infra container for %q because it is not found", podFullName)
} else if changed {
glog.V(2).Infof("Need to restart pod infra container for %q because it is changed", podFullName)
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
}
for index, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
c := runningPod.FindContainerByName(container.Name)
if c == nil {
if shouldContainerBeRestarted(&container, pod, &podStatus, kl.readinessManager) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
containersToStart[index] = empty{}
}
continue
}
containerID := kubeletTypes.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)
if createPodInfraContainer {
// createPodInfraContainer == true and Container exists
// If we're creating infra containere everything will be killed anyway
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
// killed them when restarting Infra Container.
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name)
containersToStart[index] = empty{}
}
continue
}
// At this point, the container is running and pod infra container is good.
// We will look for changes and check healthiness for the container.
containerChanged := hash != 0 && hash != expectedHash
if containerChanged {
glog.Infof("pod %q container %q hash changed (%d vs %d), it will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
containersToStart[index] = empty{}
continue
}
result, err := kl.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.V(4).Infof("probe success: %q", container.Name)
containersToKeep[containerID] = index
continue
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
containersToStart[index] = empty{}
}
// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty.
// (In fact, when createPodInfraContainer is false, containersToKeep will not be touched).
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container
// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[kubeletTypes.DockerID]int)
}
return podContainerChangesSpec{
startInfraContainer: createPodInfraContainer,
infraContainerId: podInfraContainerID,
containersToStart: containersToStart,
containersToKeep: containersToKeep,
}, nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID uid := pod.UID
@ -1123,14 +968,14 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
return err return err
} }
containerChanges, err := kl.computePodContainerChanges(pod, runningPod, podStatus) containerChanges, err := kl.containerManager.ComputePodContainerChanges(pod, runningPod, podStatus)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil { if err != nil {
return err return err
} }
if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) { if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 { if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName) glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)
} else { } else {
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName) glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
@ -1144,7 +989,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} else { } else {
// Otherwise kill any containers in this pod which are not specified as ones to keep. // Otherwise kill any containers in this pod which are not specified as ones to keep.
for _, container := range runningPod.Containers { for _, container := range runningPod.Containers {
_, keep := containerChanges.containersToKeep[kubeletTypes.DockerID(container.ID)] _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
if !keep { if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container) glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.containerManager.KillContainer(container.ID) err = kl.containerManager.KillContainer(container.ID)
@ -1173,8 +1018,8 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
kl.volumeManager.SetVolumes(pod.UID, podVolumes) kl.volumeManager.SetVolumes(pod.UID, podVolumes)
// If we should create infra container then we do it first. // If we should create infra container then we do it first.
podInfraContainerID := containerChanges.infraContainerId podInfraContainerID := containerChanges.InfraContainerId
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) { if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
glog.V(4).Infof("Creating pod infra container for %q", podFullName) glog.V(4).Infof("Creating pod infra container for %q", podFullName)
podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner) podInfraContainerID, err = kl.containerManager.CreatePodInfraContainer(pod, kl, kl.handlerRunner)
@ -1189,7 +1034,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} }
// Start everything // Start everything
for container := range containerChanges.containersToStart { for container := range containerChanges.ContainersToStart {
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
containerSpec := &pod.Spec.Containers[container] containerSpec := &pod.Spec.Containers[container]
if err := kl.pullImage(pod, containerSpec); err != nil { if err := kl.pullImage(pod, containerSpec); err != nil {

View File

@ -108,7 +108,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager() podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin) kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, nil)
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers( kubelet.podWorkers = newPodWorkers(
kubelet.runtimeCache, kubelet.runtimeCache,
@ -120,6 +120,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
fakeRecorder) fakeRecorder)
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) kubelet.prober = prober.New(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
kubelet.containerManager.Prober = kubelet.prober
kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager) kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}

View File

@ -26,6 +26,7 @@ import (
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
@ -42,7 +43,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np) dockerManager := dockertools.NewDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
lock := sync.Mutex{} lock := sync.Mutex{}

View File

@ -18,13 +18,13 @@ package prober
import ( import (
"fmt" "fmt"
"io"
"strconv" "strconv"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec" execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http" httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
@ -42,12 +42,19 @@ type Prober interface {
Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error)
} }
type ContainerCommandRunner interface {
RunInContainer(containerID string, cmd []string) ([]byte, error)
ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error
}
// Prober helps to check the liveness/readiness of a container. // Prober helps to check the liveness/readiness of a container.
type prober struct { type prober struct {
exec execprobe.ExecProber exec execprobe.ExecProber
http httprobe.HTTPProber http httprobe.HTTPProber
tcp tcprobe.TCPProber tcp tcprobe.TCPProber
runner dockertools.ContainerCommandRunner // TODO(vmarmol): Remove when we remove the circular dependency to DockerManager.
Runner ContainerCommandRunner
readinessManager *kubecontainer.ReadinessManager readinessManager *kubecontainer.ReadinessManager
refManager *kubecontainer.RefManager refManager *kubecontainer.RefManager
@ -57,7 +64,7 @@ type prober struct {
// NewProber creates a Prober, it takes a command runner and // NewProber creates a Prober, it takes a command runner and
// several container info managers. // several container info managers.
func New( func New(
runner dockertools.ContainerCommandRunner, runner ContainerCommandRunner,
readinessManager *kubecontainer.ReadinessManager, readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager, refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober { recorder record.EventRecorder) Prober {
@ -66,7 +73,7 @@ func New(
exec: execprobe.New(), exec: execprobe.New(),
http: httprobe.New(), http: httprobe.New(),
tcp: tcprobe.New(), tcp: tcprobe.New(),
runner: runner, Runner: runner,
readinessManager: readinessManager, readinessManager: readinessManager,
refManager: refManager, refManager: refManager,
@ -249,7 +256,7 @@ type execInContainer struct {
func (p *prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd { func (p *prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd {
return execInContainer{func() ([]byte, error) { return execInContainer{func() ([]byte, error) {
return p.runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) return p.Runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command)
}} }}
} }

View File

@ -0,0 +1,31 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
)
var _ Prober = &FakeProber{}
type FakeProber struct {
}
func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
return probe.Success, nil
}

View File

@ -28,6 +28,7 @@ import (
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletProber "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/prober"
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
cadvisorApi "github.com/google/cadvisor/info/v1" cadvisorApi "github.com/google/cadvisor/info/v1"
) )
@ -158,7 +159,8 @@ func TestRunOnce(t *testing.T) {
0, 0,
"", "",
kubecontainer.FakeOS{}, kubecontainer.FakeOS{},
kb.networkPlugin) kb.networkPlugin,
&kubeletProber.FakeProber{})
kb.containerManager.Puller = &dockertools.FakeDockerPuller{} kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
pods := []*api.Pod{ pods := []*api.Pod{