Merge pull request #14221 from timstclair/readiness-workers

Refactor readiness probing
This commit is contained in:
Alex Robinson 2015-10-05 13:26:54 -07:00
commit d8120f5425
19 changed files with 1040 additions and 203 deletions

View File

@ -51,7 +51,7 @@ func TrimRuntimePrefix(fullString string) string {
// ShouldContainerBeRestarted checks whether a container needs to be restarted. // ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this. // TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *ReadinessManager) bool { func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
podFullName := GetPodFullName(pod) podFullName := GetPodFullName(pod)
// Get all dead container status. // Get all dead container status.
@ -62,11 +62,6 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
} }
} }
// Set dead containers to notReady state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(TrimRuntimePrefix(c.ContainerID))
}
// Check RestartPolicy for dead container. // Check RestartPolicy for dead container.
if len(resultStatus) > 0 { if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever { if pod.Spec.RestartPolicy == api.RestartPolicyNever {

View File

@ -30,7 +30,7 @@ import (
func NewFakeDockerManager( func NewFakeDockerManager(
client DockerInterface, client DockerInterface,
recorder record.EventRecorder, recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager, prober prober.Prober,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo, machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string, podInfraContainerImage string,
@ -44,10 +44,9 @@ func NewFakeDockerManager(
fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs() fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false) fakeOOMAdjuster, fakeProcFs, false)
dm.dockerPuller = &FakeDockerPuller{} dm.dockerPuller = &FakeDockerPuller{}
dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder)
return dm return dm
} }

View File

@ -87,7 +87,6 @@ var podInfraContainerImagePullPolicy = api.PullIfNotPresent
type DockerManager struct { type DockerManager struct {
client DockerInterface client DockerInterface
recorder record.EventRecorder recorder record.EventRecorder
readinessManager *kubecontainer.ReadinessManager
containerRefManager *kubecontainer.RefManager containerRefManager *kubecontainer.RefManager
os kubecontainer.OSInterface os kubecontainer.OSInterface
machineInfo *cadvisorApi.MachineInfo machineInfo *cadvisorApi.MachineInfo
@ -145,7 +144,7 @@ type DockerManager struct {
func NewDockerManager( func NewDockerManager(
client DockerInterface, client DockerInterface,
recorder record.EventRecorder, recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager, prober prober.Prober,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo, machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string, podInfraContainerImage string,
@ -195,7 +194,6 @@ func NewDockerManager(
dm := &DockerManager{ dm := &DockerManager{
client: client, client: client,
recorder: recorder, recorder: recorder,
readinessManager: readinessManager,
containerRefManager: containerRefManager, containerRefManager: containerRefManager,
os: osInterface, os: osInterface,
machineInfo: machineInfo, machineInfo: machineInfo,
@ -205,7 +203,7 @@ func NewDockerManager(
dockerRoot: dockerRoot, dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir, containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin, networkPlugin: networkPlugin,
prober: nil, prober: prober,
generator: generator, generator: generator,
execHandler: execHandler, execHandler: execHandler,
oomAdjuster: oomAdjuster, oomAdjuster: oomAdjuster,
@ -213,7 +211,6 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota, cpuCFSQuota: cpuCFSQuota,
} }
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm) dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm)
return dm return dm
@ -1386,8 +1383,6 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con
gracePeriod -= int64(unversioned.Now().Sub(start.Time).Seconds()) gracePeriod -= int64(unversioned.Now().Sub(start.Time).Seconds())
} }
dm.readinessManager.RemoveReadiness(ID)
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds { if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds gracePeriod = minimumGracePeriodInSeconds
@ -1682,7 +1677,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
c := runningPod.FindContainerByName(container.Name) c := runningPod.FindContainerByName(container.Name)
if c == nil { if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) { if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
// If we are here it means that the container is dead and should be restarted, or never existed and should // If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has // be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal. // RestartPolicy::Always, but it's not a big deal.
@ -1717,7 +1712,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue continue
} }
result, err := dm.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) result, err := dm.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created)
if err != nil { if err != nil {
// TODO(vmarmol): examine this logic. // TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name) glog.V(2).Infof("probe no-error: %q", container.Name)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec" uexec "k8s.io/kubernetes/pkg/util/exec"
@ -74,14 +75,13 @@ func (*fakeOptionGenerator) GenerateRunContainerOptions(pod *api.Pod, container
func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManager, *FakeDockerClient) { func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManager, *FakeDockerClient) {
fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}} fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
optionGenerator := &fakeOptionGenerator{} optionGenerator := &fakeOptionGenerator{}
dockerManager := NewFakeDockerManager( dockerManager := NewFakeDockerManager(
fakeDocker, fakeDocker,
fakeRecorder, fakeRecorder,
readinessManager, prober.FakeProber{},
containerRefManager, containerRefManager,
&cadvisorApi.MachineInfo{}, &cadvisorApi.MachineInfo{},
PodInfraContainerImage, PodInfraContainerImage,
@ -398,10 +398,6 @@ func TestKillContainerInPod(t *testing.T) {
containerToKill := &containers[0] containerToKill := &containers[0]
containerToSpare := &containers[1] containerToSpare := &containers[1]
fakeDocker.ContainerList = containers fakeDocker.ContainerList = containers
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -410,13 +406,9 @@ func TestKillContainerInPod(t *testing.T) {
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil { if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err) t.Errorf("container was not stopped correctly: %v", err)
} }
// Assert the container has been spared.
// Verify that the readiness has been removed for the stopped container. if err := fakeDocker.AssertStopped([]string{containerToSpare.ID}); err == nil {
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready { t.Errorf("container unexpectedly stopped: %v", containerToSpare.ID)
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
} }
} }
@ -471,10 +463,6 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
}, },
}, },
} }
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -510,27 +498,12 @@ func TestKillContainerInPodWithError(t *testing.T) {
Names: []string{"/k8s_bar_qux_new_1234_42"}, Names: []string{"/k8s_bar_qux_new_1234_42"},
}, },
} }
containerToKill := &containers[0]
containerToSpare := &containers[1]
fakeDocker.ContainerList = containers fakeDocker.ContainerList = containers
fakeDocker.Errors["stop"] = fmt.Errorf("sample error") fakeDocker.Errors["stop"] = fmt.Errorf("sample error")
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}
if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
t.Errorf("expected error, found nil") t.Errorf("expected error, found nil")
} }
// Verify that the readiness has been removed even though the stop failed.
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready {
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
}
} }
func TestIsAExitError(t *testing.T) { func TestIsAExitError(t *testing.T) {

View File

@ -52,11 +52,13 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
@ -244,7 +246,6 @@ func NewMainKubelet(
return nil, fmt.Errorf("failed to initialize disk manager: %v", err) return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
} }
statusManager := status.NewManager(kubeClient) statusManager := status.NewManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
volumeManager := newVolumeManager() volumeManager := newVolumeManager()
@ -259,7 +260,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
resyncInterval: resyncInterval, resyncInterval: resyncInterval,
containerRefManager: containerRefManager, containerRefManager: containerRefManager,
readinessManager: readinessManager,
httpClient: &http.Client{}, httpClient: &http.Client{},
sourcesReady: sourcesReady, sourcesReady: sourcesReady,
registerNode: registerNode, registerNode: registerNode,
@ -317,7 +317,7 @@ func NewMainKubelet(
klet.containerRuntime = dockertools.NewDockerManager( klet.containerRuntime = dockertools.NewDockerManager(
dockerClient, dockerClient,
recorder, recorder,
readinessManager, klet, // prober
containerRefManager, containerRefManager,
machineInfo, machineInfo,
podInfraContainerImage, podInfraContainerImage,
@ -343,7 +343,7 @@ func NewMainKubelet(
klet, klet,
recorder, recorder,
containerRefManager, containerRefManager,
readinessManager, klet, // prober
klet.volumeManager) klet.volumeManager)
if err != nil { if err != nil {
return nil, err return nil, err
@ -386,6 +386,12 @@ func NewMainKubelet(
klet.runner = klet.containerRuntime klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = newBasicPodManager(klet.kubeClient)
klet.prober = prober.New(klet.runner, containerRefManager, recorder)
klet.probeManager = prober.NewManager(
klet.resyncInterval,
klet.statusManager,
klet.prober)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil { if err != nil {
return nil, err return nil, err
@ -486,8 +492,10 @@ type Kubelet struct {
// Network plugin. // Network plugin.
networkPlugin network.NetworkPlugin networkPlugin network.NetworkPlugin
// Container readiness state manager. // Handles container readiness probing
readinessManager *kubecontainer.ReadinessManager probeManager prober.Manager
// TODO: Move prober ownership to the probeManager once the runtime no longer depends on it.
prober prober.Prober
// How long to keep idle streaming command execution/port forwarding // How long to keep idle streaming command execution/port forwarding
// connections open before terminating them // connections open before terminating them
@ -1662,6 +1670,7 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Stop the workers for no-longer existing pods. // Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers? // TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)
runningPods, err := kl.runtimeCache.GetPods() runningPods, err := kl.runtimeCache.GetPods()
if err != nil { if err != nil {
@ -1990,6 +1999,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
} }
} }
@ -2021,6 +2031,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
if err := kl.deletePod(pod.UID); err != nil { if err := kl.deletePod(pod.UID); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err) glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)
} }
kl.probeManager.RemovePod(pod)
} }
} }
@ -2609,15 +2620,8 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
// Assume info is ready to process // Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses) podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
for _, c := range spec.Containers { kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
for i, st := range podStatus.ContainerStatuses {
if st.Name == c.Name {
ready := st.State.Running != nil && kl.readinessManager.GetReadiness(kubecontainer.TrimRuntimePrefix(st.ContainerID))
podStatus.ContainerStatuses[i].Ready = ready
break
}
}
}
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...) podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...)
if !kl.standaloneMode { if !kl.standaloneMode {
@ -2787,6 +2791,16 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime return kl.containerRuntime
} }
// Proxy prober calls through the Kubelet to break the circular dependency between the runtime &
// prober.
// TODO: Remove this hack once the runtime no longer depends on the prober.
func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt)
}
func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
return kl.prober.ProbeReadiness(pod, status, container, containerID)
}
var minRsrc = resource.MustParse("1k") var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P") var maxRsrc = resource.MustParse("1P")

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -105,7 +106,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{} kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{} kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient) kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil { if err := kubelet.setupDataDirs(); err != nil {
@ -130,6 +130,10 @@ func newTestKubelet(t *testing.T) *TestKubelet {
runtimeCache: kubelet.runtimeCache, runtimeCache: kubelet.runtimeCache,
t: t, t: t,
} }
kubelet.prober = prober.FakeProber{}
kubelet.probeManager = prober.FakeManager{}
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "") kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true kubelet.networkConfigured = true

View File

@ -37,6 +37,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -143,13 +144,12 @@ func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) { func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) {
fakeDocker := &dockertools.FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}} fakeDocker := &dockertools.FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager() containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager( dockerManager := dockertools.NewFakeDockerManager(
fakeDocker, fakeDocker,
fakeRecorder, fakeRecorder,
readinessManager, prober.FakeProber{},
containerRefManager, containerRefManager,
&cadvisorApi.MachineInfo{}, &cadvisorApi.MachineInfo{},
dockertools.PodInfraContainerImage, dockertools.PodInfraContainerImage,

View File

@ -18,14 +18,20 @@ package prober
import ( import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/types"
) )
var _ Prober = &FakeProber{} type FakeManager struct{}
type FakeProber struct { var _ Manager = FakeManager{}
}
func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { // Unused methods.
return probe.Success, nil func (_ FakeManager) AddPod(_ *api.Pod) {}
func (_ FakeManager) RemovePod(_ *api.Pod) {}
func (_ FakeManager) CleanupPods(_ []*api.Pod) {}
func (_ FakeManager) UpdatePodStatus(_ types.UID, podStatus *api.PodStatus) {
for i := range podStatus.ContainerStatuses {
podStatus.ContainerStatuses[i].Ready = true
}
} }

View File

@ -0,0 +1,44 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/probe"
)
var _ Prober = FakeProber{}
type FakeProber struct {
Readiness probe.Result
Liveness probe.Result
Error error
}
func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) {
if c.LivenessProbe == nil {
return probe.Success, nil
}
return f.Liveness, f.Error
}
func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) {
if c.ReadinessProbe == nil {
return probe.Success, nil
}
return f.Readiness, f.Error
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
// Manager manages pod probing. It creates a probe "worker" for every container that specifies a
// probe (AddPod). The worker periodically probes its assigned container and caches the results. The
// manager usse the cached probe results to set the appropriate Ready state in the PodStatus when
// requested (UpdatePodStatus). Updating probe parameters is not currently supported.
// TODO: Move liveness probing out of the runtime, to here.
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *api.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *api.Pod)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a list of "active pods" which should not be cleaned up.
CleanupPods(activePods []*api.Pod)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *api.PodStatus)
}
type manager struct {
// Caches the results of readiness probes.
readinessCache *readinessManager
// Map of active workers for readiness
readinessProbes map[containerPath]*worker
// Lock for accessing & mutating readinessProbes
workerLock sync.RWMutex
// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager
// prober executes the probe actions.
prober Prober
// Default period for workers to execute a probe.
defaultProbePeriod time.Duration
}
func NewManager(
defaultProbePeriod time.Duration,
statusManager status.Manager,
prober Prober) Manager {
return &manager{
defaultProbePeriod: defaultProbePeriod,
statusManager: statusManager,
prober: prober,
readinessCache: newReadinessManager(),
readinessProbes: make(map[containerPath]*worker),
}
}
// Key uniquely identifying containers
type containerPath struct {
podUID types.UID
containerName string
}
func (m *manager) AddPod(pod *api.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := containerPath{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if _, ok := m.readinessProbes[key]; ok {
glog.Errorf("Readiness probe already exists! %v - %v",
kubecontainer.GetPodFullName(pod), c.Name)
return
}
if c.ReadinessProbe != nil {
m.readinessProbes[key] = m.newWorker(pod, c)
}
}
}
func (m *manager) RemovePod(pod *api.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := containerPath{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if worker, ok := m.readinessProbes[key]; ok {
close(worker.stop)
}
}
}
func (m *manager) CleanupPods(activePods []*api.Pod) {
desiredPods := make(map[types.UID]sets.Empty)
for _, pod := range activePods {
desiredPods[pod.UID] = sets.Empty{}
}
m.workerLock.RLock()
defer m.workerLock.RUnlock()
for path, worker := range m.readinessProbes {
if _, ok := desiredPods[path.podUID]; !ok {
close(worker.stop)
}
}
}
func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
for i, c := range podStatus.ContainerStatuses {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessCache.getReadiness(kubecontainer.TrimRuntimePrefix(c.ContainerID)); ok {
ready = result
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getReadinessProbe(podUID, c.Name)
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready
}
}
func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
probe, ok := m.readinessProbes[containerPath{podUID, containerName}]
return probe, ok
}
// Called by the worker after exiting.
func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.readinessProbes, containerPath{podUID, containerName})
}

View File

@ -0,0 +1,280 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"fmt"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestAddRemovePods(t *testing.T) {
noProbePod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "no_probe_pod",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "no_probe1",
}, {
Name: "no_probe2",
}},
},
}
probePod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "probe_pod",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "no_probe1",
}, {
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "no_probe2",
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
m := newTestManager()
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Adding a pod with no probes should be a no-op.
m.AddPod(&noProbePod)
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Adding a pod with probes.
m.AddPod(&probePod)
probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}}
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
// Removing un-probed pod.
m.RemovePod(&noProbePod)
if err := expectProbes(m, probePaths); err != nil {
t.Error(err)
}
// Removing probed pod.
m.RemovePod(&probePod)
if err := waitForWorkerExit(m, probePaths); err != nil {
t.Fatal(err)
}
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
// Removing already removed pods should be a no-op.
m.RemovePod(&probePod)
if err := expectProbes(m, nil); err != nil {
t.Error(err)
}
}
func TestCleanupPods(t *testing.T) {
m := newTestManager()
podToCleanup := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "pod_cleanup",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
podToKeep := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "pod_keep",
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "prober1",
ReadinessProbe: &api.Probe{},
}, {
Name: "prober2",
ReadinessProbe: &api.Probe{},
}},
},
}
m.AddPod(&podToCleanup)
m.AddPod(&podToKeep)
m.CleanupPods([]*api.Pod{&podToKeep})
removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}}
expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}}
if err := waitForWorkerExit(m, removedProbes); err != nil {
t.Fatal(err)
}
if err := expectProbes(m, expectedProbes); err != nil {
t.Error(err)
}
}
func TestUpdatePodStatus(t *testing.T) {
const podUID = "pod_uid"
unprobed := api.ContainerStatus{
Name: "unprobed_container",
ContainerID: "unprobed_container_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedReady := api.ContainerStatus{
Name: "probed_container_ready",
ContainerID: "probed_container_ready_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedPending := api.ContainerStatus{
Name: "probed_container_pending",
ContainerID: "probed_container_pending_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
probedUnready := api.ContainerStatus{
Name: "probed_container_unready",
ContainerID: "probed_container_unready_id",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
}
terminated := api.ContainerStatus{
Name: "terminated_container",
ContainerID: "terminated_container_id",
State: api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
},
}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{
unprobed, probedReady, probedPending, probedUnready, terminated,
},
}
m := newTestManager()
// Setup probe "workers" and cached results.
m.readinessProbes = map[containerPath]*worker{
containerPath{podUID, probedReady.Name}: {},
containerPath{podUID, probedPending.Name}: {},
containerPath{podUID, probedUnready.Name}: {},
containerPath{podUID, terminated.Name}: {},
}
m.readinessCache.setReadiness(probedReady.ContainerID, true)
m.readinessCache.setReadiness(probedUnready.ContainerID, false)
m.readinessCache.setReadiness(terminated.ContainerID, true)
m.UpdatePodStatus(podUID, &podStatus)
expectedReadiness := map[containerPath]bool{
containerPath{podUID, unprobed.Name}: true,
containerPath{podUID, probedReady.Name}: true,
containerPath{podUID, probedPending.Name}: false,
containerPath{podUID, probedUnready.Name}: false,
containerPath{podUID, terminated.Name}: false,
}
for _, c := range podStatus.ContainerStatuses {
expected, ok := expectedReadiness[containerPath{podUID, c.Name}]
if !ok {
t.Fatalf("Missing expectation for test case: %v", c.Name)
}
if expected != c.Ready {
t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
c.Name, expected, c.Ready)
}
}
}
func expectProbes(m *manager, expectedReadinessProbes []containerPath) error {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
var unexpected []containerPath
missing := make([]containerPath, len(expectedReadinessProbes))
copy(missing, expectedReadinessProbes)
outer:
for probePath := range m.readinessProbes {
for i, expectedPath := range missing {
if probePath == expectedPath {
missing = append(missing[:i], missing[i+1:]...)
continue outer
}
}
unexpected = append(unexpected, probePath)
}
if len(missing) == 0 && len(unexpected) == 0 {
return nil // Yay!
}
return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
}
func newTestManager() *manager {
const probePeriod = 1
statusManager := status.NewManager(&testclient.Fake{})
prober := FakeProber{Readiness: probe.Success}
return NewManager(probePeriod, statusManager, prober).(*manager)
}
// Wait for the given workers to exit & clean up.
func waitForWorkerExit(m *manager, workerPaths []containerPath) error {
const interval = 100 * time.Millisecond
const timeout = 30 * time.Second
for _, w := range workerPaths {
condition := func() (bool, error) {
_, exists := m.getReadinessProbe(w.podUID, w.containerName)
return !exists, nil
}
if exited, _ := condition(); exited {
continue // Already exited, no need to poll.
}
glog.Infof("Polling %v", w)
if err := wait.Poll(interval, timeout, condition); err != nil {
return err
}
}
return nil
}

View File

@ -41,7 +41,8 @@ const maxProbeRetries = 3
// Prober checks the healthiness of a container. // Prober checks the healthiness of a container.
type Prober interface { type Prober interface {
Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error)
ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error)
} }
// Prober helps to check the liveness/readiness of a container. // Prober helps to check the liveness/readiness of a container.
@ -51,55 +52,30 @@ type prober struct {
tcp tcprobe.TCPProber tcp tcprobe.TCPProber
runner kubecontainer.ContainerCommandRunner runner kubecontainer.ContainerCommandRunner
readinessManager *kubecontainer.ReadinessManager refManager *kubecontainer.RefManager
refManager *kubecontainer.RefManager recorder record.EventRecorder
recorder record.EventRecorder
} }
// NewProber creates a Prober, it takes a command runner and // NewProber creates a Prober, it takes a command runner and
// several container info managers. // several container info managers.
func New( func New(
runner kubecontainer.ContainerCommandRunner, runner kubecontainer.ContainerCommandRunner,
readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager, refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober { recorder record.EventRecorder) Prober {
return &prober{ return &prober{
exec: execprobe.New(), exec: execprobe.New(),
http: httprobe.New(), http: httprobe.New(),
tcp: tcprobe.New(), tcp: tcprobe.New(),
runner: runner, runner: runner,
refManager: refManager,
readinessManager: readinessManager, recorder: recorder,
refManager: refManager,
recorder: recorder,
} }
} }
// New prober for use in tests. // ProbeLiveness probes the liveness of a container.
func NewTestProber(
exec execprobe.ExecProber,
readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) Prober {
return &prober{
exec: exec,
readinessManager: readinessManager,
refManager: refManager,
recorder: recorder,
}
}
// Probe checks the liveness/readiness of the given container.
func (pb *prober) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
pb.probeReadiness(pod, status, container, containerID, createdAt)
return pb.probeLiveness(pod, status, container, containerID, createdAt)
}
// probeLiveness probes the liveness of a container.
// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success. // If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success.
func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
var live probe.Result var live probe.Result
var output string var output string
var err error var err error
@ -137,24 +113,20 @@ func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container ap
return probe.Success, nil return probe.Success, nil
} }
// probeReadiness probes and sets the readiness of a container. // ProbeReadiness probes and sets the readiness of a container.
// If the initial delay on the readiness probe has not passed, we set readiness to false. func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) {
var ready probe.Result var ready probe.Result
var output string var output string
var err error var err error
p := container.ReadinessProbe p := container.ReadinessProbe
if p == nil { if p == nil {
ready = probe.Success ready = probe.Success
} else if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
ready = probe.Failure
} else { } else {
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
} }
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
if err != nil || ready == probe.Failure { if err != nil || ready == probe.Failure {
// Readiness failed in one way or another. // Readiness failed in one way or another.
pb.readinessManager.SetReadiness(containerID, false)
ref, ok := pb.refManager.GetRef(containerID) ref, ok := pb.refManager.GetRef(containerID)
if !ok { if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name) glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name)
@ -164,21 +136,17 @@ func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container a
if ok { if ok {
pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe errored: %v", err) pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe errored: %v", err)
} }
return
} else { // ready != probe.Success } else { // ready != probe.Success
glog.V(1).Infof("Readiness probe for %q failed (%v): %s", ctrName, ready, output) glog.V(1).Infof("Readiness probe for %q failed (%v): %s", ctrName, ready, output)
if ok { if ok {
pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe failed: %s", output) pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe failed: %s", output)
} }
return
} }
} return probe.Failure, err
if ready == probe.Success {
pb.readinessManager.SetReadiness(containerID, true)
} }
glog.V(3).Infof("Readiness probe for %q succeeded", ctrName) glog.V(3).Infof("Readiness probe for %q succeeded", ctrName)
return ready, nil
} }
// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result

View File

@ -179,11 +179,9 @@ func TestGetTCPAddrParts(t *testing.T) {
// PLEASE READ THE PROBE DOCS BEFORE CHANGING THIS TEST IF YOU ARE UNSURE HOW PROBES ARE SUPPOSED TO WORK: // PLEASE READ THE PROBE DOCS BEFORE CHANGING THIS TEST IF YOU ARE UNSURE HOW PROBES ARE SUPPOSED TO WORK:
// (See https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/user-guide/pod-states.md#pod-conditions) // (See https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/user-guide/pod-states.md#pod-conditions)
func TestProbeContainer(t *testing.T) { func TestProbeContainer(t *testing.T) {
readinessManager := kubecontainer.NewReadinessManager()
prober := &prober{ prober := &prober{
readinessManager: readinessManager, refManager: kubecontainer.NewRefManager(),
refManager: kubecontainer.NewRefManager(), recorder: &record.FakeRecorder{},
recorder: &record.FakeRecorder{},
} }
containerID := "foobar" containerID := "foobar"
createdAt := time.Now().Unix() createdAt := time.Now().Unix()
@ -191,29 +189,29 @@ func TestProbeContainer(t *testing.T) {
tests := []struct { tests := []struct {
testContainer api.Container testContainer api.Container
expectError bool expectError bool
expectedResult probe.Result expectedLiveness probe.Result
expectedReadiness bool expectedReadiness probe.Result
}{ }{
// No probes. // No probes.
{ {
testContainer: api.Container{}, testContainer: api.Container{},
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
// Only LivenessProbe. expectedReadiness should always be true here. // Only LivenessProbe. expectedReadiness should always be true here.
{ {
testContainer: api.Container{ testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
}, },
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -224,8 +222,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Failure, expectedLiveness: probe.Failure,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -236,8 +234,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -248,8 +246,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -261,23 +259,16 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
expectError: true, expectError: true,
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
// // Only ReadinessProbe. expectedResult should always be probe.Success here. // // Only ReadinessProbe. expectedLiveness should always be probe.Success here.
{ {
testContainer: api.Container{ testContainer: api.Container{
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: false, expectedReadiness: probe.Unknown,
},
{
testContainer: api.Container{
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
},
expectedResult: probe.Success,
expectedReadiness: false,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -288,8 +279,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -300,8 +291,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -312,8 +303,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -325,8 +316,8 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
expectError: false, expectError: false,
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
// Both LivenessProbe and ReadinessProbe. // Both LivenessProbe and ReadinessProbe.
{ {
@ -334,32 +325,32 @@ func TestProbeContainer(t *testing.T) {
LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
}, },
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
}, },
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -371,8 +362,8 @@ func TestProbeContainer(t *testing.T) {
}, },
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
}, },
expectedResult: probe.Unknown, expectedLiveness: probe.Unknown,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -384,8 +375,8 @@ func TestProbeContainer(t *testing.T) {
}, },
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
}, },
expectedResult: probe.Failure, expectedLiveness: probe.Failure,
expectedReadiness: false, expectedReadiness: probe.Unknown,
}, },
{ {
testContainer: api.Container{ testContainer: api.Container{
@ -402,29 +393,37 @@ func TestProbeContainer(t *testing.T) {
}, },
}, },
}, },
expectedResult: probe.Success, expectedLiveness: probe.Success,
expectedReadiness: true, expectedReadiness: probe.Success,
}, },
} }
for i, test := range tests { for i, test := range tests {
if test.expectError { if test.expectError {
prober.exec = fakeExecProber{test.expectedResult, errors.New("exec error")} prober.exec = fakeExecProber{test.expectedLiveness, errors.New("exec error")}
} else { } else {
prober.exec = fakeExecProber{test.expectedResult, nil} prober.exec = fakeExecProber{test.expectedLiveness, nil}
} }
result, err := prober.Probe(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
if test.expectError && err == nil { if test.expectError && err == nil {
t.Errorf("[%d] Expected error but no error was returned.", i) t.Errorf("[%d] Expected liveness probe error but no error was returned.", i)
} }
if !test.expectError && err != nil { if !test.expectError && err != nil {
t.Errorf("[%d] Didn't expect error but got: %v", i, err) t.Errorf("[%d] Didn't expect liveness probe error but got: %v", i, err)
} }
if test.expectedResult != result { if test.expectedLiveness != liveness {
t.Errorf("[%d] Expected result to be %v but was %v", i, test.expectedResult, result) t.Errorf("[%d] Expected liveness result to be %v but was %v", i, test.expectedLiveness, liveness)
} }
if test.expectedReadiness != readinessManager.GetReadiness(containerID) {
t.Errorf("[%d] Expected readiness to be %v but was %v", i, test.expectedReadiness, readinessManager.GetReadiness(containerID)) // TODO: Test readiness errors
prober.exec = fakeExecProber{test.expectedReadiness, nil}
readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
if err != nil {
t.Errorf("[%d] Unexpected readiness probe error: %v", i, err)
}
if test.expectedReadiness != readiness {
t.Errorf("[%d] Expected readiness result to be %v but was %v", i, test.expectedReadiness, readiness)
} }
} }
} }

View File

@ -14,45 +14,45 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package container package prober
import "sync" import "sync"
// ReadinessManager maintains the readiness information(probe results) of // readinessManager maintains the readiness information(probe results) of
// containers over time to allow for implementation of health thresholds. // containers over time to allow for implementation of health thresholds.
// This manager is thread-safe, no locks are necessary for the caller. // This manager is thread-safe, no locks are necessary for the caller.
type ReadinessManager struct { type readinessManager struct {
// guards states // guards states
sync.RWMutex sync.RWMutex
// TODO(yifan): To use strong type. // TODO(yifan): To use strong type.
states map[string]bool states map[string]bool
} }
// NewReadinessManager creates ane returns a readiness manager with empty // newReadinessManager creates ane returns a readiness manager with empty
// contents. // contents.
func NewReadinessManager() *ReadinessManager { func newReadinessManager() *readinessManager {
return &ReadinessManager{states: make(map[string]bool)} return &readinessManager{states: make(map[string]bool)}
} }
// GetReadiness returns the readiness value for the container with the given ID. // getReadiness returns the readiness value for the container with the given ID.
// If the readiness value is found, returns it. // If the readiness value is found, returns it.
// If the readiness is not found, returns false. // If the readiness is not found, returns false.
func (r *ReadinessManager) GetReadiness(id string) bool { func (r *readinessManager) getReadiness(id string) (ready bool, found bool) {
r.RLock() r.RLock()
defer r.RUnlock() defer r.RUnlock()
state, found := r.states[id] state, found := r.states[id]
return state && found return state, found
} }
// SetReadiness sets the readiness value for the container with the given ID. // setReadiness sets the readiness value for the container with the given ID.
func (r *ReadinessManager) SetReadiness(id string, value bool) { func (r *readinessManager) setReadiness(id string, value bool) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.states[id] = value r.states[id] = value
} }
// RemoveReadiness clears the readiness value for the container with the given ID. // removeReadiness clears the readiness value for the container with the given ID.
func (r *ReadinessManager) RemoveReadiness(id string) { func (r *readinessManager) removeReadiness(id string) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
delete(r.states, id) delete(r.states, id)

View File

@ -0,0 +1,153 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)
// worker handles the periodic probing of its assigned container. Each worker has a go-routine
// associated with it which runs the probe loop until the container permanently terminates, or the
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
// container IDs.
// TODO: Handle liveness probing
type worker struct {
// Channel for stopping the probe, it should be closed to trigger a stop.
stop chan struct{}
// The pod containing this probe (read-only)
pod *api.Pod
// The container to probe (read-only)
container api.Container
// Describes the probe configuration (read-only)
spec *api.Probe
// The last known container ID for this worker.
containerID types.UID
}
// Creates and starts a new probe worker.
func (m *manager) newWorker(
pod *api.Pod,
container api.Container) *worker {
w := &worker{
stop: make(chan struct{}),
pod: pod,
container: container,
spec: container.ReadinessProbe,
}
// Start the worker thread.
go run(m, w)
return w
}
// run periodically probes the container.
func run(m *manager, w *worker) {
probeTicker := time.NewTicker(m.defaultProbePeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if w.containerID != "" {
m.readinessCache.removeReadiness(string(w.containerID))
}
m.removeReadinessProbe(w.pod.UID, w.container.Name)
}()
probeLoop:
for doProbe(m, w) {
// Wait for next probe tick.
select {
case <-w.stop:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
// doProbe probes the container once and records the result.
// Returns whether the worker should continue.
func doProbe(m *manager, w *worker) (keepGoing bool) {
defer util.HandleCrash(func(_ interface{}) { keepGoing = true })
status, ok := m.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Either the pod has not been created yet, or it was already deleted.
glog.V(3).Infof("No status for pod: %v", kubeutil.FormatPodName(w.pod))
return true
}
// Worker should terminate if pod is terminated.
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
glog.V(3).Infof("Pod %v %v, exiting probe worker",
kubeutil.FormatPodName(w.pod), status.Phase)
return false
}
c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok {
// Either the container has not been created yet, or it was deleted.
glog.V(3).Infof("Non-existant container probed: %v - %v",
kubeutil.FormatPodName(w.pod), w.container.Name)
return true // Wait for more information.
}
if w.containerID != types.UID(c.ContainerID) {
if w.containerID != "" {
m.readinessCache.removeReadiness(string(w.containerID))
}
w.containerID = types.UID(kubecontainer.TrimRuntimePrefix(c.ContainerID))
}
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
kubeutil.FormatPodName(w.pod), w.container.Name)
m.readinessCache.setReadiness(string(w.containerID), false)
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
// Readiness defaults to false during the initial delay.
m.readinessCache.setReadiness(string(w.containerID), false)
return true
}
// TODO: Move error handling out of prober.
result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, string(w.containerID))
if result != probe.Unknown {
m.readinessCache.setReadiness(string(w.containerID), result != probe.Failure)
}
return true
}

View File

@ -0,0 +1,240 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/probe"
)
const (
containerID = "cOnTaInEr_Id"
containerName = "cOnTaInEr_NaMe"
podUID = "pOd_UiD"
)
func TestDoProbe(t *testing.T) {
m := newTestManager()
// Test statuses.
runningStatus := getRunningStatus()
pendingStatus := getRunningStatus()
pendingStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus := getRunningStatus()
terminatedStatus.ContainerStatuses[0].State.Running = nil
terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{
StartedAt: unversioned.Now(),
}
otherStatus := getRunningStatus()
otherStatus.ContainerStatuses[0].Name = "otherContainer"
failedStatus := getRunningStatus()
failedStatus.Phase = api.PodFailed
tests := []struct {
probe api.Probe
podStatus *api.PodStatus
expectContinue bool
expectReadySet bool
expectedReadiness bool
}{
{ // No status.
expectContinue: true,
},
{ // Pod failed
podStatus: &failedStatus,
},
{ // No container status
podStatus: &otherStatus,
expectContinue: true,
},
{ // Container waiting
podStatus: &pendingStatus,
expectContinue: true,
expectReadySet: true,
},
{ // Container terminated
podStatus: &terminatedStatus,
expectReadySet: true,
},
{ // Probe successful.
podStatus: &runningStatus,
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
},
{ // Initial delay passed
podStatus: &runningStatus,
probe: api.Probe{
InitialDelaySeconds: -100,
},
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
},
}
for i, test := range tests {
w := newTestWorker(test.probe)
if test.podStatus != nil {
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
}
if c := doProbe(m, w); c != test.expectContinue {
t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c)
}
ready, ok := m.readinessCache.getReadiness(containerID)
if ok != test.expectReadySet {
t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok)
}
if ready != test.expectedReadiness {
t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready)
}
// Clean up.
m.statusManager.DeletePodStatus(podUID)
m.readinessCache.removeReadiness(containerID)
}
}
func TestInitialDelay(t *testing.T) {
m := newTestManager()
w := newTestWorker(api.Probe{
InitialDelaySeconds: 10,
})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
ready, ok := m.readinessCache.getReadiness(containerID)
if !ok {
t.Errorf("Expected readiness to be false, but was not set")
} else if ready {
t.Errorf("Expected readiness to be false, but was true")
}
// 100 seconds later...
laterStatus := getRunningStatus()
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
time.Now().Add(-100 * time.Second)
m.statusManager.SetPodStatus(w.pod, laterStatus)
// Second call should succeed (already waited).
if !doProbe(m, w) {
t.Errorf("Expected to continue, but did not")
}
ready, ok = m.readinessCache.getReadiness(containerID)
if !ok {
t.Errorf("Expected readiness to be true, but was not set")
} else if !ready {
t.Errorf("Expected readiness to be true, but was false")
}
}
func TestCleanUp(t *testing.T) {
m := newTestManager()
pod := getTestPod(api.Probe{})
m.statusManager.SetPodStatus(&pod, getRunningStatus())
m.readinessCache.setReadiness(containerID, true)
w := m.newWorker(&pod, pod.Spec.Containers[0])
m.readinessProbes[containerPath{podUID, containerName}] = w
if ready, _ := m.readinessCache.getReadiness(containerID); !ready {
t.Fatal("Expected readiness to be true.")
}
close(w.stop)
if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil {
t.Fatal(err)
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
t.Error("Expected readiness to be cleared.")
}
if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok {
t.Error("Expected worker to be cleared.")
}
}
func TestHandleCrash(t *testing.T) {
m := newTestManager()
m.prober = CrashingProber{}
w := newTestWorker(api.Probe{})
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
// doProbe should recover from the crash, and keep going.
if !doProbe(m, w) {
t.Error("Expected to keep going, but terminated.")
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
t.Error("Expected readiness to be unchanged from crash.")
}
}
func newTestWorker(probeSpec api.Probe) *worker {
pod := getTestPod(probeSpec)
return &worker{
stop: make(chan struct{}),
pod: &pod,
container: pod.Spec.Containers[0],
spec: &probeSpec,
}
}
func getRunningStatus() api.PodStatus {
containerStatus := api.ContainerStatus{
Name: containerName,
ContainerID: containerID,
}
containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()}
podStatus := api.PodStatus{
Phase: api.PodRunning,
ContainerStatuses: []api.ContainerStatus{containerStatus},
}
return podStatus
}
func getTestPod(probeSpec api.Probe) api.Pod {
container := api.Container{
Name: containerName,
ReadinessProbe: &probeSpec,
}
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.UID = podUID
return pod
}
type CrashingProber struct{}
func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) {
panic("Intentional ProbeLiveness crash.")
}
func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) {
panic("Intentional ProbeReadiness crash.")
}

View File

@ -94,7 +94,6 @@ type runtime struct {
generator kubecontainer.RunContainerOptionsGenerator generator kubecontainer.RunContainerOptionsGenerator
recorder record.EventRecorder recorder record.EventRecorder
prober prober.Prober prober prober.Prober
readinessManager *kubecontainer.ReadinessManager
volumeGetter volumeGetter volumeGetter volumeGetter
imagePuller kubecontainer.ImagePuller imagePuller kubecontainer.ImagePuller
} }
@ -113,7 +112,7 @@ func New(config *Config,
generator kubecontainer.RunContainerOptionsGenerator, generator kubecontainer.RunContainerOptionsGenerator,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
readinessManager *kubecontainer.ReadinessManager, prober prober.Prober,
volumeGetter volumeGetter) (kubecontainer.Runtime, error) { volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
systemdVersion, err := getSystemdVersion() systemdVersion, err := getSystemdVersion()
@ -151,10 +150,9 @@ func New(config *Config,
containerRefManager: containerRefManager, containerRefManager: containerRefManager,
generator: generator, generator: generator,
recorder: recorder, recorder: recorder,
readinessManager: readinessManager, prober: prober,
volumeGetter: volumeGetter, volumeGetter: volumeGetter,
} }
rkt.prober = prober.New(rkt, readinessManager, containerRefManager, recorder)
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt) rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt)
// Test the rkt version. // Test the rkt version.
@ -1002,7 +1000,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
c := runningPod.FindContainerByName(container.Name) c := runningPod.FindContainerByName(container.Name)
if c == nil { if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) { if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
// TODO(yifan): Containers in one pod are fate-sharing at this moment, see: // TODO(yifan): Containers in one pod are fate-sharing at this moment, see:
// https://github.com/appc/spec/issues/276. // https://github.com/appc/spec/issues/276.
@ -1022,7 +1020,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
break break
} }
result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) result, err := r.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created)
// TODO(vmarmol): examine this logic. // TODO(vmarmol): examine this logic.
if err == nil && result != probe.Success { if err == nil && result != probe.Success {
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)

View File

@ -44,7 +44,6 @@ func TestRunOnce(t *testing.T) {
nodeLister: testNodeLister{}, nodeLister: testNodeLister{},
statusManager: status.NewManager(nil), statusManager: status.NewManager(nil),
containerRefManager: kubecontainer.NewRefManager(), containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager, podManager: podManager,
os: kubecontainer.FakeOS{}, os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(), volumeManager: newVolumeManager(),

View File

@ -118,6 +118,9 @@ func makePodSpec(readinessProbe, livenessProbe *api.Probe) *api.Pod {
Image: "gcr.io/google_containers/test-webserver", Image: "gcr.io/google_containers/test-webserver",
LivenessProbe: livenessProbe, LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe, ReadinessProbe: readinessProbe,
}, {
Name: "test-noprobe",
Image: "gcr.io/google_containers/pause",
}, },
}, },
}, },