mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Merge pull request #15275 from timstclair/liveness-workers
Auto commit by PR queue bot
This commit is contained in:
commit
f5da178738
@ -21,7 +21,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/oom"
|
||||
@ -31,7 +31,7 @@ import (
|
||||
func NewFakeDockerManager(
|
||||
client DockerInterface,
|
||||
recorder record.EventRecorder,
|
||||
prober prober.Prober,
|
||||
livenessManager proberesults.Manager,
|
||||
containerRefManager *kubecontainer.RefManager,
|
||||
machineInfo *cadvisorapi.MachineInfo,
|
||||
podInfraContainerImage string,
|
||||
@ -45,7 +45,7 @@ func NewFakeDockerManager(
|
||||
|
||||
fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
|
||||
fakeProcFs := procfs.NewFakeProcFs()
|
||||
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
|
||||
dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
|
||||
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
|
||||
fakeOOMAdjuster, fakeProcFs, false, imageBackOff)
|
||||
dm.dockerPuller = &FakeDockerPuller{}
|
||||
|
@ -44,10 +44,9 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/hairpin"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@ -119,8 +118,8 @@ type DockerManager struct {
|
||||
// Network plugin.
|
||||
networkPlugin network.NetworkPlugin
|
||||
|
||||
// Health check prober.
|
||||
prober prober.Prober
|
||||
// Health check results.
|
||||
livenessManager proberesults.Manager
|
||||
|
||||
// Generator of runtime container options.
|
||||
generator kubecontainer.RunContainerOptionsGenerator
|
||||
@ -147,7 +146,7 @@ type DockerManager struct {
|
||||
func NewDockerManager(
|
||||
client DockerInterface,
|
||||
recorder record.EventRecorder,
|
||||
prober prober.Prober,
|
||||
livenessManager proberesults.Manager,
|
||||
containerRefManager *kubecontainer.RefManager,
|
||||
machineInfo *cadvisorapi.MachineInfo,
|
||||
podInfraContainerImage string,
|
||||
@ -208,7 +207,7 @@ func NewDockerManager(
|
||||
dockerRoot: dockerRoot,
|
||||
containerLogsDir: containerLogsDir,
|
||||
networkPlugin: networkPlugin,
|
||||
prober: prober,
|
||||
livenessManager: livenessManager,
|
||||
generator: generator,
|
||||
execHandler: execHandler,
|
||||
oomAdjuster: oomAdjuster,
|
||||
@ -1762,20 +1761,13 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
|
||||
continue
|
||||
}
|
||||
|
||||
result, err := dm.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created)
|
||||
if err != nil {
|
||||
// TODO(vmarmol): examine this logic.
|
||||
glog.V(2).Infof("probe no-error: %q", container.Name)
|
||||
containersToKeep[containerID] = index
|
||||
continue
|
||||
}
|
||||
if result == probe.Success {
|
||||
glog.V(4).Infof("probe success: %q", container.Name)
|
||||
liveness, found := dm.livenessManager.Get(c.ID)
|
||||
if !found || liveness == proberesults.Success {
|
||||
containersToKeep[containerID] = index
|
||||
continue
|
||||
}
|
||||
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
|
||||
glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
|
||||
glog.Infof("pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name)
|
||||
containersToStart[index] = empty{}
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
uexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
@ -83,7 +84,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
|
||||
dockerManager := NewFakeDockerManager(
|
||||
fakeDocker,
|
||||
fakeRecorder,
|
||||
prober.FakeProber{},
|
||||
proberesults.NewManager(),
|
||||
containerRefManager,
|
||||
&cadvisorapi.MachineInfo{},
|
||||
PodInfraContainerImage,
|
||||
@ -854,6 +855,10 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
const (
|
||||
unhealthyContainerID = "1234"
|
||||
infraContainerID = "9876"
|
||||
)
|
||||
dm, fakeDocker := newTestDockerManager()
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@ -862,40 +867,35 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
Namespace: "new",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{Name: "bar",
|
||||
LivenessProbe: &api.Probe{
|
||||
// Always returns healthy == false
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{{Name: "unhealthy"}},
|
||||
},
|
||||
}
|
||||
|
||||
fakeDocker.ContainerList = []docker.APIContainers{
|
||||
{
|
||||
// the k8s prefix is required for the kubelet to manage the container
|
||||
Names: []string{"/k8s_bar_foo_new_12345678_42"},
|
||||
ID: "1234",
|
||||
Names: []string{"/k8s_unhealthy_foo_new_12345678_42"},
|
||||
ID: unhealthyContainerID,
|
||||
},
|
||||
{
|
||||
// pod infra container
|
||||
Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42"},
|
||||
ID: "9876",
|
||||
ID: infraContainerID,
|
||||
},
|
||||
}
|
||||
fakeDocker.ContainerMap = map[string]*docker.Container{
|
||||
"1234": {
|
||||
ID: "1234",
|
||||
unhealthyContainerID: {
|
||||
ID: unhealthyContainerID,
|
||||
Config: &docker.Config{},
|
||||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
"9876": {
|
||||
ID: "9876",
|
||||
infraContainerID: {
|
||||
ID: infraContainerID,
|
||||
Config: &docker.Config{},
|
||||
HostConfig: &docker.HostConfig{},
|
||||
},
|
||||
}
|
||||
dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil)
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
@ -908,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
"create", "start", "inspect_container",
|
||||
})
|
||||
|
||||
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
|
||||
if err := fakeDocker.AssertStopped([]string{unhealthyContainerID}); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
}
|
||||
}
|
||||
|
@ -54,12 +54,12 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/rkt"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@ -309,6 +309,10 @@ func NewMainKubelet(
|
||||
|
||||
procFs := procfs.NewProcFs()
|
||||
imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff)
|
||||
|
||||
readinessManager := proberesults.NewManager()
|
||||
klet.livenessManager = proberesults.NewManagerWithUpdates()
|
||||
|
||||
// Initialize the runtime.
|
||||
switch containerRuntime {
|
||||
case "docker":
|
||||
@ -316,7 +320,7 @@ func NewMainKubelet(
|
||||
klet.containerRuntime = dockertools.NewDockerManager(
|
||||
dockerClient,
|
||||
recorder,
|
||||
klet, // prober
|
||||
klet.livenessManager,
|
||||
containerRefManager,
|
||||
machineInfo,
|
||||
podInfraContainerImage,
|
||||
@ -344,7 +348,7 @@ func NewMainKubelet(
|
||||
klet,
|
||||
recorder,
|
||||
containerRefManager,
|
||||
klet, // prober
|
||||
klet.livenessManager,
|
||||
klet.volumeManager,
|
||||
imageBackOff)
|
||||
if err != nil {
|
||||
@ -396,11 +400,14 @@ func NewMainKubelet(
|
||||
klet.runner = klet.containerRuntime
|
||||
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
|
||||
|
||||
klet.prober = prober.New(klet.runner, containerRefManager, recorder)
|
||||
klet.probeManager = prober.NewManager(
|
||||
klet.resyncInterval,
|
||||
klet.statusManager,
|
||||
klet.prober)
|
||||
readinessManager,
|
||||
klet.livenessManager,
|
||||
klet.runner,
|
||||
containerRefManager,
|
||||
recorder)
|
||||
|
||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
|
||||
if err != nil {
|
||||
@ -508,10 +515,10 @@ type Kubelet struct {
|
||||
// Network plugin.
|
||||
networkPlugin network.NetworkPlugin
|
||||
|
||||
// Handles container readiness probing
|
||||
// Handles container probing.
|
||||
probeManager prober.Manager
|
||||
// TODO: Move prober ownership to the probeManager once the runtime no longer depends on it.
|
||||
prober prober.Prober
|
||||
// Manages container health check results.
|
||||
livenessManager proberesults.Manager
|
||||
|
||||
// How long to keep idle streaming command execution/port forwarding
|
||||
// connections open before terminating them
|
||||
@ -1982,6 +1989,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||
// Periodically syncs all the pods and performs cleanup tasks.
|
||||
glog.V(4).Infof("SyncLoop (periodic sync)")
|
||||
handler.HandlePodSyncs(kl.podManager.GetPods())
|
||||
case update := <-kl.livenessManager.Updates():
|
||||
// We only care about failures (signalling container death) here.
|
||||
if update.Result == proberesults.Failure {
|
||||
glog.V(1).Infof("SyncLoop (container unhealthy).")
|
||||
handler.HandlePodSyncs([]*api.Pod{update.Pod})
|
||||
}
|
||||
}
|
||||
kl.syncLoopMonitor.Store(time.Now())
|
||||
return true
|
||||
@ -2831,16 +2844,6 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
|
||||
return kl.containerRuntime
|
||||
}
|
||||
|
||||
// Proxy prober calls through the Kubelet to break the circular dependency between the runtime &
|
||||
// prober.
|
||||
// TODO: Remove this hack once the runtime no longer depends on the prober.
|
||||
func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) {
|
||||
return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt)
|
||||
}
|
||||
func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
|
||||
return kl.prober.ProbeReadiness(pod, status, container, containerID)
|
||||
}
|
||||
|
||||
var minRsrc = resource.MustParse("1k")
|
||||
var maxRsrc = resource.MustParse("1P")
|
||||
|
||||
|
@ -47,6 +47,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@ -134,8 +135,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
t: t,
|
||||
}
|
||||
|
||||
kubelet.prober = prober.FakeProber{}
|
||||
kubelet.probeManager = prober.FakeManager{}
|
||||
kubelet.livenessManager = proberesults.NewManager()
|
||||
|
||||
kubelet.volumeManager = newVolumeManager()
|
||||
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
@ -152,7 +152,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker
|
||||
dockerManager := dockertools.NewFakeDockerManager(
|
||||
fakeDocker,
|
||||
fakeRecorder,
|
||||
prober.FakeProber{},
|
||||
proberesults.NewManager(),
|
||||
containerRefManager,
|
||||
&cadvisorapi.MachineInfo{},
|
||||
dockertools.PodInfraContainerImage,
|
||||
|
@ -1,45 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package prober
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
)
|
||||
|
||||
var _ Prober = FakeProber{}
|
||||
|
||||
type FakeProber struct {
|
||||
Readiness probe.Result
|
||||
Liveness probe.Result
|
||||
Error error
|
||||
}
|
||||
|
||||
func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) {
|
||||
if c.LivenessProbe == nil {
|
||||
return probe.Success, nil
|
||||
}
|
||||
return f.Liveness, f.Error
|
||||
}
|
||||
|
||||
func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) {
|
||||
if c.ReadinessProbe == nil {
|
||||
return probe.Success, nil
|
||||
}
|
||||
return f.Readiness, f.Error
|
||||
}
|
@ -22,9 +22,11 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
@ -53,19 +55,22 @@ type Manager interface {
|
||||
}
|
||||
|
||||
type manager struct {
|
||||
// Caches the results of readiness probes.
|
||||
readinessCache results.Manager
|
||||
|
||||
// Map of active workers for readiness
|
||||
readinessProbes map[containerPath]*worker
|
||||
// Lock for accessing & mutating readinessProbes
|
||||
// Map of active workers for probes
|
||||
workers map[probeKey]*worker
|
||||
// Lock for accessing & mutating workers
|
||||
workerLock sync.RWMutex
|
||||
|
||||
// The statusManager cache provides pod IP and container IDs for probing.
|
||||
statusManager status.Manager
|
||||
|
||||
// readinessManager manages the results of readiness probes
|
||||
readinessManager results.Manager
|
||||
|
||||
// livenessManager manages the results of liveness probes
|
||||
livenessManager results.Manager
|
||||
|
||||
// prober executes the probe actions.
|
||||
prober Prober
|
||||
prober *prober
|
||||
|
||||
// Default period for workers to execute a probe.
|
||||
defaultProbePeriod time.Duration
|
||||
@ -74,36 +79,79 @@ type manager struct {
|
||||
func NewManager(
|
||||
defaultProbePeriod time.Duration,
|
||||
statusManager status.Manager,
|
||||
prober Prober) Manager {
|
||||
readinessManager results.Manager,
|
||||
livenessManager results.Manager,
|
||||
runner kubecontainer.ContainerCommandRunner,
|
||||
refManager *kubecontainer.RefManager,
|
||||
recorder record.EventRecorder) Manager {
|
||||
prober := newProber(runner, refManager, recorder)
|
||||
return &manager{
|
||||
defaultProbePeriod: defaultProbePeriod,
|
||||
statusManager: statusManager,
|
||||
prober: prober,
|
||||
readinessCache: results.NewManager(),
|
||||
readinessProbes: make(map[containerPath]*worker),
|
||||
readinessManager: readinessManager,
|
||||
livenessManager: livenessManager,
|
||||
workers: make(map[probeKey]*worker),
|
||||
}
|
||||
}
|
||||
|
||||
// Key uniquely identifying containers
|
||||
type containerPath struct {
|
||||
// Key uniquely identifying container probes
|
||||
type probeKey struct {
|
||||
podUID types.UID
|
||||
containerName string
|
||||
probeType probeType
|
||||
}
|
||||
|
||||
// Type of probe (readiness or liveness)
|
||||
type probeType int
|
||||
|
||||
const (
|
||||
liveness probeType = iota
|
||||
readiness
|
||||
)
|
||||
|
||||
// For debugging.
|
||||
func (t probeType) String() string {
|
||||
switch t {
|
||||
case readiness:
|
||||
return "Readiness"
|
||||
case liveness:
|
||||
return "Liveness"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) AddPod(pod *api.Pod) {
|
||||
m.workerLock.Lock()
|
||||
defer m.workerLock.Unlock()
|
||||
|
||||
key := containerPath{podUID: pod.UID}
|
||||
key := probeKey{podUID: pod.UID}
|
||||
for _, c := range pod.Spec.Containers {
|
||||
key.containerName = c.Name
|
||||
if _, ok := m.readinessProbes[key]; ok {
|
||||
glog.Errorf("Readiness probe already exists! %v - %v",
|
||||
kubecontainer.GetPodFullName(pod), c.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if c.ReadinessProbe != nil {
|
||||
m.readinessProbes[key] = m.newWorker(pod, c)
|
||||
key.probeType = readiness
|
||||
if _, ok := m.workers[key]; ok {
|
||||
glog.Errorf("Readiness probe already exists! %v - %v",
|
||||
kubeutil.FormatPodName(pod), c.Name)
|
||||
return
|
||||
}
|
||||
w := newWorker(m, readiness, pod, c)
|
||||
m.workers[key] = w
|
||||
go w.run()
|
||||
}
|
||||
|
||||
if c.LivenessProbe != nil {
|
||||
key.probeType = liveness
|
||||
if _, ok := m.workers[key]; ok {
|
||||
glog.Errorf("Liveness probe already exists! %v - %v",
|
||||
kubeutil.FormatPodName(pod), c.Name)
|
||||
return
|
||||
}
|
||||
w := newWorker(m, liveness, pod, c)
|
||||
m.workers[key] = w
|
||||
go w.run()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,11 +160,14 @@ func (m *manager) RemovePod(pod *api.Pod) {
|
||||
m.workerLock.RLock()
|
||||
defer m.workerLock.RUnlock()
|
||||
|
||||
key := containerPath{podUID: pod.UID}
|
||||
key := probeKey{podUID: pod.UID}
|
||||
for _, c := range pod.Spec.Containers {
|
||||
key.containerName = c.Name
|
||||
if worker, ok := m.readinessProbes[key]; ok {
|
||||
close(worker.stop)
|
||||
for _, probeType := range [...]probeType{readiness, liveness} {
|
||||
key.probeType = probeType
|
||||
if worker, ok := m.workers[key]; ok {
|
||||
close(worker.stop)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -130,8 +181,8 @@ func (m *manager) CleanupPods(activePods []*api.Pod) {
|
||||
m.workerLock.RLock()
|
||||
defer m.workerLock.RUnlock()
|
||||
|
||||
for path, worker := range m.readinessProbes {
|
||||
if _, ok := desiredPods[path.podUID]; !ok {
|
||||
for key, worker := range m.workers {
|
||||
if _, ok := desiredPods[key.podUID]; !ok {
|
||||
close(worker.stop)
|
||||
}
|
||||
}
|
||||
@ -142,28 +193,27 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
|
||||
var ready bool
|
||||
if c.State.Running == nil {
|
||||
ready = false
|
||||
} else if result, ok := m.readinessCache.Get(
|
||||
kubecontainer.ParseContainerID(c.ContainerID)); ok {
|
||||
} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
|
||||
ready = result == results.Success
|
||||
} else {
|
||||
// The check whether there is a probe which hasn't run yet.
|
||||
_, exists := m.getReadinessProbe(podUID, c.Name)
|
||||
_, exists := m.getWorker(podUID, c.Name, readiness)
|
||||
ready = !exists
|
||||
}
|
||||
podStatus.ContainerStatuses[i].Ready = ready
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) {
|
||||
func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
|
||||
m.workerLock.RLock()
|
||||
defer m.workerLock.RUnlock()
|
||||
probe, ok := m.readinessProbes[containerPath{podUID, containerName}]
|
||||
return probe, ok
|
||||
worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
|
||||
return worker, ok
|
||||
}
|
||||
|
||||
// Called by the worker after exiting.
|
||||
func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) {
|
||||
func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
|
||||
m.workerLock.Lock()
|
||||
defer m.workerLock.Unlock()
|
||||
delete(m.readinessProbes, containerPath{podUID, containerName})
|
||||
delete(m.workers, probeKey{podUID, containerName, probeType})
|
||||
}
|
||||
|
@ -23,11 +23,13 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
@ -53,13 +55,13 @@ func TestAddRemovePods(t *testing.T) {
|
||||
Containers: []api.Container{{
|
||||
Name: "no_probe1",
|
||||
}, {
|
||||
Name: "prober1",
|
||||
Name: "readiness",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
}, {
|
||||
Name: "no_probe2",
|
||||
}, {
|
||||
Name: "prober2",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
Name: "liveness",
|
||||
LivenessProbe: &api.Probe{},
|
||||
}},
|
||||
},
|
||||
}
|
||||
@ -77,7 +79,10 @@ func TestAddRemovePods(t *testing.T) {
|
||||
|
||||
// Adding a pod with probes.
|
||||
m.AddPod(&probePod)
|
||||
probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}}
|
||||
probePaths := []probeKey{
|
||||
{"probe_pod", "readiness", readiness},
|
||||
{"probe_pod", "liveness", liveness},
|
||||
}
|
||||
if err := expectProbes(m, probePaths); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@ -115,8 +120,8 @@ func TestCleanupPods(t *testing.T) {
|
||||
Name: "prober1",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
}, {
|
||||
Name: "prober2",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
Name: "prober2",
|
||||
LivenessProbe: &api.Probe{},
|
||||
}},
|
||||
},
|
||||
}
|
||||
@ -129,8 +134,8 @@ func TestCleanupPods(t *testing.T) {
|
||||
Name: "prober1",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
}, {
|
||||
Name: "prober2",
|
||||
ReadinessProbe: &api.Probe{},
|
||||
Name: "prober2",
|
||||
LivenessProbe: &api.Probe{},
|
||||
}},
|
||||
},
|
||||
}
|
||||
@ -139,8 +144,14 @@ func TestCleanupPods(t *testing.T) {
|
||||
|
||||
m.CleanupPods([]*api.Pod{&podToKeep})
|
||||
|
||||
removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}}
|
||||
expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}}
|
||||
removedProbes := []probeKey{
|
||||
{"pod_cleanup", "prober1", readiness},
|
||||
{"pod_cleanup", "prober2", liveness},
|
||||
}
|
||||
expectedProbes := []probeKey{
|
||||
{"pod_keep", "prober1", readiness},
|
||||
{"pod_keep", "prober2", liveness},
|
||||
}
|
||||
if err := waitForWorkerExit(m, removedProbes); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -195,28 +206,28 @@ func TestUpdatePodStatus(t *testing.T) {
|
||||
|
||||
m := newTestManager()
|
||||
// Setup probe "workers" and cached results.
|
||||
m.readinessProbes = map[containerPath]*worker{
|
||||
containerPath{podUID, probedReady.Name}: {},
|
||||
containerPath{podUID, probedPending.Name}: {},
|
||||
containerPath{podUID, probedUnready.Name}: {},
|
||||
containerPath{podUID, terminated.Name}: {},
|
||||
m.workers = map[probeKey]*worker{
|
||||
probeKey{podUID, unprobed.Name, liveness}: {},
|
||||
probeKey{podUID, probedReady.Name, readiness}: {},
|
||||
probeKey{podUID, probedPending.Name, readiness}: {},
|
||||
probeKey{podUID, probedUnready.Name, readiness}: {},
|
||||
probeKey{podUID, terminated.Name, readiness}: {},
|
||||
}
|
||||
|
||||
m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success)
|
||||
m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure)
|
||||
m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil)
|
||||
m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil)
|
||||
|
||||
m.UpdatePodStatus(podUID, &podStatus)
|
||||
|
||||
expectedReadiness := map[containerPath]bool{
|
||||
containerPath{podUID, unprobed.Name}: true,
|
||||
containerPath{podUID, probedReady.Name}: true,
|
||||
containerPath{podUID, probedPending.Name}: false,
|
||||
containerPath{podUID, probedUnready.Name}: false,
|
||||
containerPath{podUID, terminated.Name}: false,
|
||||
expectedReadiness := map[probeKey]bool{
|
||||
probeKey{podUID, unprobed.Name, readiness}: true,
|
||||
probeKey{podUID, probedReady.Name, readiness}: true,
|
||||
probeKey{podUID, probedPending.Name, readiness}: false,
|
||||
probeKey{podUID, probedUnready.Name, readiness}: false,
|
||||
probeKey{podUID, terminated.Name, readiness}: false,
|
||||
}
|
||||
for _, c := range podStatus.ContainerStatuses {
|
||||
expected, ok := expectedReadiness[containerPath{podUID, c.Name}]
|
||||
expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}]
|
||||
if !ok {
|
||||
t.Fatalf("Missing expectation for test case: %v", c.Name)
|
||||
}
|
||||
@ -227,16 +238,16 @@ func TestUpdatePodStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func expectProbes(m *manager, expectedReadinessProbes []containerPath) error {
|
||||
func expectProbes(m *manager, expectedProbes []probeKey) error {
|
||||
m.workerLock.RLock()
|
||||
defer m.workerLock.RUnlock()
|
||||
|
||||
var unexpected []containerPath
|
||||
missing := make([]containerPath, len(expectedReadinessProbes))
|
||||
copy(missing, expectedReadinessProbes)
|
||||
var unexpected []probeKey
|
||||
missing := make([]probeKey, len(expectedProbes))
|
||||
copy(missing, expectedProbes)
|
||||
|
||||
outer:
|
||||
for probePath := range m.readinessProbes {
|
||||
for probePath := range m.workers {
|
||||
for i, expectedPath := range missing {
|
||||
if probePath == expectedPath {
|
||||
missing = append(missing[:i], missing[i+1:]...)
|
||||
@ -255,26 +266,34 @@ outer:
|
||||
|
||||
func newTestManager() *manager {
|
||||
const probePeriod = 1
|
||||
statusManager := status.NewManager(&testclient.Fake{})
|
||||
prober := FakeProber{Readiness: probe.Success}
|
||||
return NewManager(probePeriod, statusManager, prober).(*manager)
|
||||
m := NewManager(
|
||||
probePeriod,
|
||||
status.NewManager(&testclient.Fake{}),
|
||||
results.NewManager(),
|
||||
results.NewManager(),
|
||||
nil, // runner
|
||||
kubecontainer.NewRefManager(),
|
||||
&record.FakeRecorder{},
|
||||
).(*manager)
|
||||
// Don't actually execute probes.
|
||||
m.prober.exec = fakeExecProber{probe.Success, nil}
|
||||
return m
|
||||
}
|
||||
|
||||
// Wait for the given workers to exit & clean up.
|
||||
func waitForWorkerExit(m *manager, workerPaths []containerPath) error {
|
||||
func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
|
||||
const interval = 100 * time.Millisecond
|
||||
const timeout = 30 * time.Second
|
||||
|
||||
for _, w := range workerPaths {
|
||||
condition := func() (bool, error) {
|
||||
_, exists := m.getReadinessProbe(w.podUID, w.containerName)
|
||||
_, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
|
||||
return !exists, nil
|
||||
}
|
||||
if exited, _ := condition(); exited {
|
||||
continue // Already exited, no need to poll.
|
||||
}
|
||||
glog.Infof("Polling %v", w)
|
||||
if err := wait.Poll(interval, timeout, condition); err != nil {
|
||||
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -39,12 +39,6 @@ import (
|
||||
|
||||
const maxProbeRetries = 3
|
||||
|
||||
// Prober checks the healthiness of a container.
|
||||
type Prober interface {
|
||||
ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error)
|
||||
ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error)
|
||||
}
|
||||
|
||||
// Prober helps to check the liveness/readiness of a container.
|
||||
type prober struct {
|
||||
exec execprobe.ExecProber
|
||||
@ -58,10 +52,10 @@ type prober struct {
|
||||
|
||||
// NewProber creates a Prober, it takes a command runner and
|
||||
// several container info managers.
|
||||
func New(
|
||||
func newProber(
|
||||
runner kubecontainer.ContainerCommandRunner,
|
||||
refManager *kubecontainer.RefManager,
|
||||
recorder record.EventRecorder) Prober {
|
||||
recorder record.EventRecorder) *prober {
|
||||
|
||||
return &prober{
|
||||
exec: execprobe.New(),
|
||||
@ -73,9 +67,19 @@ func New(
|
||||
}
|
||||
}
|
||||
|
||||
// ProbeLiveness probes the liveness of a container.
|
||||
// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success.
|
||||
func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) {
|
||||
// probe probes the container.
|
||||
func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
|
||||
switch probeType {
|
||||
case readiness:
|
||||
return pb.probeReadiness(pod, status, container, containerID)
|
||||
case liveness:
|
||||
return pb.probeLiveness(pod, status, container, containerID)
|
||||
}
|
||||
return probe.Unknown, fmt.Errorf("Unknown probe type: %q", probeType)
|
||||
}
|
||||
|
||||
// probeLiveness probes the liveness of a container.
|
||||
func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
|
||||
var live probe.Result
|
||||
var output string
|
||||
var err error
|
||||
@ -83,11 +87,7 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap
|
||||
if p == nil {
|
||||
return probe.Success, nil
|
||||
}
|
||||
if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
|
||||
return probe.Success, nil
|
||||
} else {
|
||||
live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
|
||||
}
|
||||
live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
|
||||
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
|
||||
if err != nil || live != probe.Success {
|
||||
// Liveness failed in one way or another.
|
||||
@ -113,17 +113,16 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap
|
||||
return probe.Success, nil
|
||||
}
|
||||
|
||||
// ProbeReadiness probes and sets the readiness of a container.
|
||||
func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
|
||||
// probeReadiness probes and sets the readiness of a container.
|
||||
func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) {
|
||||
var ready probe.Result
|
||||
var output string
|
||||
var err error
|
||||
p := container.ReadinessProbe
|
||||
if p == nil {
|
||||
ready = probe.Success
|
||||
} else {
|
||||
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
|
||||
return probe.Success, nil
|
||||
}
|
||||
ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
|
||||
ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name)
|
||||
if err != nil || ready == probe.Failure {
|
||||
// Readiness failed in one way or another.
|
||||
|
@ -19,7 +19,6 @@ package prober
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
@ -184,7 +183,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
recorder: &record.FakeRecorder{},
|
||||
}
|
||||
containerID := kubecontainer.ContainerID{"test", "foobar"}
|
||||
createdAt := time.Now().Unix()
|
||||
|
||||
tests := []struct {
|
||||
testContainer api.Container
|
||||
@ -201,14 +199,7 @@ func TestProbeContainer(t *testing.T) {
|
||||
// Only LivenessProbe. expectedReadiness should always be true here.
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
},
|
||||
expectedLiveness: probe.Success,
|
||||
expectedReadiness: probe.Success,
|
||||
},
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
LivenessProbe: &api.Probe{},
|
||||
},
|
||||
expectedLiveness: probe.Unknown,
|
||||
expectedReadiness: probe.Success,
|
||||
@ -216,7 +207,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -228,7 +218,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -240,7 +229,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -252,7 +240,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -265,7 +252,7 @@ func TestProbeContainer(t *testing.T) {
|
||||
// // Only ReadinessProbe. expectedLiveness should always be probe.Success here.
|
||||
{
|
||||
testContainer: api.Container{
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
ReadinessProbe: &api.Probe{},
|
||||
},
|
||||
expectedLiveness: probe.Success,
|
||||
expectedReadiness: probe.Unknown,
|
||||
@ -273,7 +260,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
ReadinessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -285,7 +271,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
ReadinessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -297,7 +282,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
ReadinessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -309,7 +293,6 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
ReadinessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -322,32 +305,8 @@ func TestProbeContainer(t *testing.T) {
|
||||
// Both LivenessProbe and ReadinessProbe.
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
},
|
||||
expectedLiveness: probe.Success,
|
||||
expectedReadiness: probe.Unknown,
|
||||
},
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
},
|
||||
expectedLiveness: probe.Success,
|
||||
expectedReadiness: probe.Unknown,
|
||||
},
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: 100},
|
||||
},
|
||||
expectedLiveness: probe.Unknown,
|
||||
expectedReadiness: probe.Unknown,
|
||||
},
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
LivenessProbe: &api.Probe{},
|
||||
ReadinessProbe: &api.Probe{},
|
||||
},
|
||||
expectedLiveness: probe.Unknown,
|
||||
expectedReadiness: probe.Unknown,
|
||||
@ -355,25 +314,11 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
},
|
||||
expectedLiveness: probe.Unknown,
|
||||
expectedReadiness: probe.Unknown,
|
||||
},
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
ReadinessProbe: &api.Probe{InitialDelaySeconds: -100},
|
||||
ReadinessProbe: &api.Probe{},
|
||||
},
|
||||
expectedLiveness: probe.Failure,
|
||||
expectedReadiness: probe.Unknown,
|
||||
@ -381,13 +326,11 @@ func TestProbeContainer(t *testing.T) {
|
||||
{
|
||||
testContainer: api.Container{
|
||||
LivenessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
ReadinessProbe: &api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
Handler: api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
@ -405,7 +348,7 @@ func TestProbeContainer(t *testing.T) {
|
||||
prober.exec = fakeExecProber{test.expectedLiveness, nil}
|
||||
}
|
||||
|
||||
liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt)
|
||||
liveness, err := prober.probeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
|
||||
if test.expectError && err == nil {
|
||||
t.Errorf("[%d] Expected liveness probe error but no error was returned.", i)
|
||||
}
|
||||
@ -418,7 +361,7 @@ func TestProbeContainer(t *testing.T) {
|
||||
|
||||
// TODO: Test readiness errors
|
||||
prober.exec = fakeExecProber{test.expectedReadiness, nil}
|
||||
readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
|
||||
readiness, err := prober.probeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID)
|
||||
if err != nil {
|
||||
t.Errorf("[%d] Unexpected readiness probe error: %v", i, err)
|
||||
}
|
||||
|
@ -19,17 +19,23 @@ package results
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
)
|
||||
|
||||
// Manager provides a probe results cache.
|
||||
// Manager provides a probe results cache and channel of updates.
|
||||
type Manager interface {
|
||||
// Get returns the cached result for the container with the given ID.
|
||||
Get(id kubecontainer.ContainerID) (Result, bool)
|
||||
Get(kubecontainer.ContainerID) (Result, bool)
|
||||
// Set sets the cached result for the container with the given ID.
|
||||
Set(id kubecontainer.ContainerID, result Result)
|
||||
// The pod is only included to be sent with the update.
|
||||
Set(kubecontainer.ContainerID, Result, *api.Pod)
|
||||
// Remove clears the cached result for the container with the given ID.
|
||||
Remove(id kubecontainer.ContainerID)
|
||||
Remove(kubecontainer.ContainerID)
|
||||
// Updates creates a channel that receives an Update whenever its result changes (but not
|
||||
// removed).
|
||||
// NOTE: The current implementation only supports a single updates channel.
|
||||
Updates() <-chan Update
|
||||
}
|
||||
|
||||
// Result is the type for probe results.
|
||||
@ -51,19 +57,36 @@ func (r Result) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
// Update is an enum of the types of updates sent over the Updates channel.
|
||||
type Update struct {
|
||||
ContainerID kubecontainer.ContainerID
|
||||
Result Result
|
||||
Pod *api.Pod
|
||||
}
|
||||
|
||||
// Manager implementation.
|
||||
type manager struct {
|
||||
// guards the cache
|
||||
sync.RWMutex
|
||||
// map of container ID -> probe Result
|
||||
cache map[kubecontainer.ContainerID]Result
|
||||
// channel of updates (may be nil)
|
||||
updates chan Update
|
||||
}
|
||||
|
||||
var _ Manager = &manager{}
|
||||
|
||||
// NewManager creates ane returns an empty results manager.
|
||||
func NewManager() Manager {
|
||||
return &manager{cache: make(map[kubecontainer.ContainerID]Result)}
|
||||
m := &manager{cache: make(map[kubecontainer.ContainerID]Result)}
|
||||
return m
|
||||
}
|
||||
|
||||
// NewManager creates ane returns an empty results manager.
|
||||
func NewManagerWithUpdates() Manager {
|
||||
m := NewManager().(*manager)
|
||||
m.updates = make(chan Update, 20)
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
|
||||
@ -73,13 +96,22 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
|
||||
return result, found
|
||||
}
|
||||
|
||||
func (m *manager) Set(id kubecontainer.ContainerID, result Result) {
|
||||
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
|
||||
if m.setInternal(id, result) {
|
||||
m.pushUpdate(Update{id, result, pod})
|
||||
}
|
||||
}
|
||||
|
||||
// Internal helper for locked portion of set. Returns whether an update should be sent.
|
||||
func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
prev, exists := m.cache[id]
|
||||
if !exists || prev != result {
|
||||
m.cache[id] = result
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *manager) Remove(id kubecontainer.ContainerID) {
|
||||
@ -87,3 +119,14 @@ func (m *manager) Remove(id kubecontainer.ContainerID) {
|
||||
defer m.Unlock()
|
||||
delete(m.cache, id)
|
||||
}
|
||||
|
||||
func (m *manager) Updates() <-chan Update {
|
||||
return m.updates
|
||||
}
|
||||
|
||||
// pushUpdates sends an update on the updates channel if it is initialized.
|
||||
func (m *manager) pushUpdate(update Update) {
|
||||
if m.updates != nil {
|
||||
m.updates <- update
|
||||
}
|
||||
}
|
||||
|
@ -18,9 +18,12 @@ package results
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func TestCacheOperations(t *testing.T) {
|
||||
@ -32,7 +35,7 @@ func TestCacheOperations(t *testing.T) {
|
||||
_, found := m.Get(unsetID)
|
||||
assert.False(t, found, "unset result found")
|
||||
|
||||
m.Set(setID, Success)
|
||||
m.Set(setID, Success, nil)
|
||||
result, found := m.Get(setID)
|
||||
assert.True(t, result == Success, "set result")
|
||||
assert.True(t, found, "set result found")
|
||||
@ -41,3 +44,55 @@ func TestCacheOperations(t *testing.T) {
|
||||
_, found = m.Get(setID)
|
||||
assert.False(t, found, "removed result found")
|
||||
}
|
||||
|
||||
func TestUpdates(t *testing.T) {
|
||||
m := NewManagerWithUpdates()
|
||||
|
||||
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}}
|
||||
fooID := kubecontainer.ContainerID{"test", "foo"}
|
||||
barID := kubecontainer.ContainerID{"test", "bar"}
|
||||
|
||||
expectUpdate := func(expected Update, msg string) {
|
||||
select {
|
||||
case u := <-m.Updates():
|
||||
if expected != u {
|
||||
t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg)
|
||||
}
|
||||
case <-time.After(util.ForeverTestTimeout):
|
||||
t.Errorf("Timed out waiting for update %v: %s", expected, msg)
|
||||
}
|
||||
}
|
||||
|
||||
expectNoUpdate := func(msg string) {
|
||||
// NOTE: Since updates are accumulated asynchronously, this method is not guaranteed to fail
|
||||
// when it should. In the event it misses a failure, the following calls to expectUpdate should
|
||||
// still fail.
|
||||
select {
|
||||
case u := <-m.Updates():
|
||||
t.Errorf("Unexpected update %v: %s", u, msg)
|
||||
default:
|
||||
// Pass
|
||||
}
|
||||
}
|
||||
|
||||
// New result should always push an update.
|
||||
m.Set(fooID, Success, pod)
|
||||
expectUpdate(Update{fooID, Success, pod}, "new success")
|
||||
|
||||
m.Set(barID, Failure, pod)
|
||||
expectUpdate(Update{barID, Failure, pod}, "new failure")
|
||||
|
||||
// Unchanged results should not send an update.
|
||||
m.Set(fooID, Success, pod)
|
||||
expectNoUpdate("unchanged foo")
|
||||
|
||||
m.Set(barID, Failure, pod)
|
||||
expectNoUpdate("unchanged bar")
|
||||
|
||||
// Changed results should send an update.
|
||||
m.Set(fooID, Failure, pod)
|
||||
expectUpdate(Update{fooID, Failure, pod}, "changed foo")
|
||||
|
||||
m.Set(barID, Success, pod)
|
||||
expectUpdate(Update{barID, Success, pod}, "changed bar")
|
||||
}
|
||||
|
@ -32,7 +32,6 @@ import (
|
||||
// associated with it which runs the probe loop until the container permanently terminates, or the
|
||||
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
|
||||
// container IDs.
|
||||
// TODO: Handle liveness probing
|
||||
type worker struct {
|
||||
// Channel for stopping the probe, it should be closed to trigger a stop.
|
||||
stop chan struct{}
|
||||
@ -46,44 +45,65 @@ type worker struct {
|
||||
// Describes the probe configuration (read-only)
|
||||
spec *api.Probe
|
||||
|
||||
// The type of the worker.
|
||||
probeType probeType
|
||||
|
||||
// The probe value during the initial delay.
|
||||
initialValue results.Result
|
||||
|
||||
// Where to store this workers results.
|
||||
resultsManager results.Manager
|
||||
probeManager *manager
|
||||
|
||||
// The last known container ID for this worker.
|
||||
containerID kubecontainer.ContainerID
|
||||
}
|
||||
|
||||
// Creates and starts a new probe worker.
|
||||
func (m *manager) newWorker(
|
||||
func newWorker(
|
||||
m *manager,
|
||||
probeType probeType,
|
||||
pod *api.Pod,
|
||||
container api.Container) *worker {
|
||||
|
||||
w := &worker{
|
||||
stop: make(chan struct{}),
|
||||
pod: pod,
|
||||
container: container,
|
||||
spec: container.ReadinessProbe,
|
||||
stop: make(chan struct{}),
|
||||
pod: pod,
|
||||
container: container,
|
||||
probeType: probeType,
|
||||
probeManager: m,
|
||||
}
|
||||
|
||||
// Start the worker thread.
|
||||
go run(m, w)
|
||||
switch probeType {
|
||||
case readiness:
|
||||
w.spec = container.ReadinessProbe
|
||||
w.resultsManager = m.readinessManager
|
||||
w.initialValue = results.Failure
|
||||
case liveness:
|
||||
w.spec = container.LivenessProbe
|
||||
w.resultsManager = m.livenessManager
|
||||
w.initialValue = results.Success
|
||||
}
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
// run periodically probes the container.
|
||||
func run(m *manager, w *worker) {
|
||||
probeTicker := time.NewTicker(m.defaultProbePeriod)
|
||||
func (w *worker) run() {
|
||||
probeTicker := time.NewTicker(w.probeManager.defaultProbePeriod)
|
||||
|
||||
defer func() {
|
||||
// Clean up.
|
||||
probeTicker.Stop()
|
||||
if !w.containerID.IsEmpty() {
|
||||
m.readinessCache.Remove(w.containerID)
|
||||
w.resultsManager.Remove(w.containerID)
|
||||
}
|
||||
|
||||
m.removeReadinessProbe(w.pod.UID, w.container.Name)
|
||||
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
|
||||
}()
|
||||
|
||||
probeLoop:
|
||||
for doProbe(m, w) {
|
||||
for w.doProbe() {
|
||||
// Wait for next probe tick.
|
||||
select {
|
||||
case <-w.stop:
|
||||
@ -96,10 +116,10 @@ probeLoop:
|
||||
|
||||
// doProbe probes the container once and records the result.
|
||||
// Returns whether the worker should continue.
|
||||
func doProbe(m *manager, w *worker) (keepGoing bool) {
|
||||
func (w *worker) doProbe() (keepGoing bool) {
|
||||
defer util.HandleCrash(func(_ interface{}) { keepGoing = true })
|
||||
|
||||
status, ok := m.statusManager.GetPodStatus(w.pod.UID)
|
||||
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
|
||||
if !ok {
|
||||
// Either the pod has not been created yet, or it was already deleted.
|
||||
glog.V(3).Infof("No status for pod: %v", kubeletutil.FormatPodName(w.pod))
|
||||
@ -123,7 +143,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
|
||||
|
||||
if w.containerID.String() != c.ContainerID {
|
||||
if !w.containerID.IsEmpty() {
|
||||
m.readinessCache.Remove(w.containerID)
|
||||
w.resultsManager.Remove(w.containerID)
|
||||
}
|
||||
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
|
||||
}
|
||||
@ -131,22 +151,23 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
|
||||
if c.State.Running == nil {
|
||||
glog.V(3).Infof("Non-running container probed: %v - %v",
|
||||
kubeletutil.FormatPodName(w.pod), w.container.Name)
|
||||
m.readinessCache.Set(w.containerID, results.Failure)
|
||||
if !w.containerID.IsEmpty() {
|
||||
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
|
||||
}
|
||||
// Abort if the container will not be restarted.
|
||||
return c.State.Terminated == nil ||
|
||||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
|
||||
}
|
||||
|
||||
if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
|
||||
// Readiness defaults to false during the initial delay.
|
||||
m.readinessCache.Set(w.containerID, results.Failure)
|
||||
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
|
||||
return true
|
||||
}
|
||||
|
||||
// TODO: Move error handling out of prober.
|
||||
result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID)
|
||||
result, _ := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
|
||||
if result != probe.Unknown {
|
||||
m.readinessCache.Set(w.containerID, result != probe.Failure)
|
||||
w.resultsManager.Set(w.containerID, result != probe.Failure, w.pod)
|
||||
}
|
||||
|
||||
return true
|
||||
|
@ -17,14 +17,19 @@ limitations under the License.
|
||||
package prober
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -52,12 +57,11 @@ func TestDoProbe(t *testing.T) {
|
||||
failedStatus.Phase = api.PodFailed
|
||||
|
||||
tests := []struct {
|
||||
probe api.Probe
|
||||
podStatus *api.PodStatus
|
||||
|
||||
expectContinue bool
|
||||
expectReadySet bool
|
||||
expectedReadiness results.Result
|
||||
probe api.Probe
|
||||
podStatus *api.PodStatus
|
||||
expectContinue bool
|
||||
expectSet bool
|
||||
expectedResult results.Result
|
||||
}{
|
||||
{ // No status.
|
||||
expectContinue: true,
|
||||
@ -72,136 +76,158 @@ func TestDoProbe(t *testing.T) {
|
||||
{ // Container waiting
|
||||
podStatus: &pendingStatus,
|
||||
expectContinue: true,
|
||||
expectReadySet: true,
|
||||
expectSet: true,
|
||||
},
|
||||
{ // Container terminated
|
||||
podStatus: &terminatedStatus,
|
||||
expectReadySet: true,
|
||||
podStatus: &terminatedStatus,
|
||||
expectSet: true,
|
||||
},
|
||||
{ // Probe successful.
|
||||
podStatus: &runningStatus,
|
||||
expectContinue: true,
|
||||
expectReadySet: true,
|
||||
expectedReadiness: results.Success,
|
||||
podStatus: &runningStatus,
|
||||
expectContinue: true,
|
||||
expectSet: true,
|
||||
expectedResult: results.Success,
|
||||
},
|
||||
{ // Initial delay passed
|
||||
podStatus: &runningStatus,
|
||||
probe: api.Probe{
|
||||
InitialDelaySeconds: -100,
|
||||
},
|
||||
expectContinue: true,
|
||||
expectReadySet: true,
|
||||
expectedReadiness: results.Success,
|
||||
expectContinue: true,
|
||||
expectSet: true,
|
||||
expectedResult: results.Success,
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
w := newTestWorker(test.probe)
|
||||
if test.podStatus != nil {
|
||||
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
|
||||
}
|
||||
if c := doProbe(m, w); c != test.expectContinue {
|
||||
t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c)
|
||||
}
|
||||
ready, ok := m.readinessCache.Get(containerID)
|
||||
if ok != test.expectReadySet {
|
||||
t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok)
|
||||
}
|
||||
if ready != test.expectedReadiness {
|
||||
t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready)
|
||||
}
|
||||
for _, probeType := range [...]probeType{liveness, readiness} {
|
||||
for i, test := range tests {
|
||||
w := newTestWorker(m, probeType, test.probe)
|
||||
if test.podStatus != nil {
|
||||
m.statusManager.SetPodStatus(w.pod, *test.podStatus)
|
||||
}
|
||||
if c := w.doProbe(); c != test.expectContinue {
|
||||
t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c)
|
||||
}
|
||||
result, ok := resultsManager(m, probeType).Get(containerID)
|
||||
if ok != test.expectSet {
|
||||
t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok)
|
||||
}
|
||||
if result != test.expectedResult {
|
||||
t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result)
|
||||
}
|
||||
|
||||
// Clean up.
|
||||
m.statusManager.DeletePodStatus(podUID)
|
||||
m.readinessCache.Remove(containerID)
|
||||
// Clean up.
|
||||
m.statusManager.DeletePodStatus(podUID)
|
||||
resultsManager(m, probeType).Remove(containerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitialDelay(t *testing.T) {
|
||||
m := newTestManager()
|
||||
w := newTestWorker(api.Probe{
|
||||
InitialDelaySeconds: 10,
|
||||
})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
|
||||
if !doProbe(m, w) {
|
||||
t.Errorf("Expected to continue, but did not")
|
||||
}
|
||||
for _, probeType := range [...]probeType{liveness, readiness} {
|
||||
w := newTestWorker(m, probeType, api.Probe{
|
||||
InitialDelaySeconds: 10,
|
||||
})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
|
||||
ready, ok := m.readinessCache.Get(containerID)
|
||||
if !ok {
|
||||
t.Errorf("Expected readiness to be false, but was not set")
|
||||
} else if ready {
|
||||
t.Errorf("Expected readiness to be false, but was true")
|
||||
}
|
||||
if !w.doProbe() {
|
||||
t.Errorf("[%s] Expected to continue, but did not", probeType)
|
||||
}
|
||||
|
||||
// 100 seconds later...
|
||||
laterStatus := getRunningStatus()
|
||||
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
|
||||
time.Now().Add(-100 * time.Second)
|
||||
m.statusManager.SetPodStatus(w.pod, laterStatus)
|
||||
expectedResult := results.Result(probeType == liveness)
|
||||
result, ok := resultsManager(m, probeType).Get(containerID)
|
||||
if !ok {
|
||||
t.Errorf("[%s] Expected result to be set during initial delay, but was not set", probeType)
|
||||
} else if result != expectedResult {
|
||||
t.Errorf("[%s] Expected result to be %v during initial delay, but was %v",
|
||||
probeType, expectedResult, result)
|
||||
}
|
||||
|
||||
// Second call should succeed (already waited).
|
||||
if !doProbe(m, w) {
|
||||
t.Errorf("Expected to continue, but did not")
|
||||
}
|
||||
// 100 seconds later...
|
||||
laterStatus := getRunningStatus()
|
||||
laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time =
|
||||
time.Now().Add(-100 * time.Second)
|
||||
m.statusManager.SetPodStatus(w.pod, laterStatus)
|
||||
|
||||
ready, ok = m.readinessCache.Get(containerID)
|
||||
if !ok {
|
||||
t.Errorf("Expected readiness to be true, but was not set")
|
||||
} else if !ready {
|
||||
t.Errorf("Expected readiness to be true, but was false")
|
||||
// Second call should succeed (already waited).
|
||||
if !w.doProbe() {
|
||||
t.Errorf("[%s] Expected to continue, but did not", probeType)
|
||||
}
|
||||
|
||||
result, ok = resultsManager(m, probeType).Get(containerID)
|
||||
if !ok {
|
||||
t.Errorf("[%s] Expected result to be true, but was not set", probeType)
|
||||
} else if !result {
|
||||
t.Errorf("[%s] Expected result to be true, but was false", probeType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanUp(t *testing.T) {
|
||||
m := newTestManager()
|
||||
pod := getTestPod(api.Probe{})
|
||||
m.statusManager.SetPodStatus(&pod, getRunningStatus())
|
||||
m.readinessCache.Set(containerID, results.Success)
|
||||
w := m.newWorker(&pod, pod.Spec.Containers[0])
|
||||
m.readinessProbes[containerPath{podUID, containerName}] = w
|
||||
|
||||
if ready, _ := m.readinessCache.Get(containerID); !ready {
|
||||
t.Fatal("Expected readiness to be true.")
|
||||
}
|
||||
for _, probeType := range [...]probeType{liveness, readiness} {
|
||||
key := probeKey{podUID, containerName, probeType}
|
||||
w := newTestWorker(m, probeType, api.Probe{})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
go w.run()
|
||||
m.workers[key] = w
|
||||
|
||||
close(w.stop)
|
||||
if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Wait for worker to run.
|
||||
condition := func() (bool, error) {
|
||||
ready, _ := resultsManager(m, probeType).Get(containerID)
|
||||
return ready == results.Success, nil
|
||||
}
|
||||
if ready, _ := condition(); !ready {
|
||||
if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil {
|
||||
t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err)
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := m.readinessCache.Get(containerID); ok {
|
||||
t.Error("Expected readiness to be cleared.")
|
||||
}
|
||||
if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok {
|
||||
t.Error("Expected worker to be cleared.")
|
||||
close(w.stop)
|
||||
if err := waitForWorkerExit(m, []probeKey{key}); err != nil {
|
||||
t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err)
|
||||
}
|
||||
|
||||
if _, ok := resultsManager(m, probeType).Get(containerID); ok {
|
||||
t.Errorf("[%s] Expected result to be cleared.", probeType)
|
||||
}
|
||||
if _, ok := m.workers[key]; ok {
|
||||
t.Errorf("[%s] Expected worker to be cleared.", probeType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleCrash(t *testing.T) {
|
||||
m := newTestManager()
|
||||
m.prober = CrashingProber{}
|
||||
w := newTestWorker(api.Probe{})
|
||||
m.prober = &prober{
|
||||
refManager: kubecontainer.NewRefManager(),
|
||||
recorder: &record.FakeRecorder{},
|
||||
exec: crashingExecProber{},
|
||||
}
|
||||
|
||||
w := newTestWorker(m, readiness, api.Probe{})
|
||||
m.statusManager.SetPodStatus(w.pod, getRunningStatus())
|
||||
|
||||
// doProbe should recover from the crash, and keep going.
|
||||
if !doProbe(m, w) {
|
||||
if !w.doProbe() {
|
||||
t.Error("Expected to keep going, but terminated.")
|
||||
}
|
||||
if _, ok := m.readinessCache.Get(containerID); ok {
|
||||
if _, ok := m.readinessManager.Get(containerID); ok {
|
||||
t.Error("Expected readiness to be unchanged from crash.")
|
||||
}
|
||||
}
|
||||
|
||||
func newTestWorker(probeSpec api.Probe) *worker {
|
||||
pod := getTestPod(probeSpec)
|
||||
return &worker{
|
||||
stop: make(chan struct{}),
|
||||
pod: &pod,
|
||||
container: pod.Spec.Containers[0],
|
||||
spec: &probeSpec,
|
||||
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
|
||||
// All tests rely on the fake exec prober.
|
||||
probeSpec.Handler = api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
}
|
||||
|
||||
pod := getTestPod(probeType, probeSpec)
|
||||
return newWorker(m, probeType, &pod, pod.Spec.Containers[0])
|
||||
}
|
||||
|
||||
func getRunningStatus() api.PodStatus {
|
||||
@ -217,10 +243,15 @@ func getRunningStatus() api.PodStatus {
|
||||
return podStatus
|
||||
}
|
||||
|
||||
func getTestPod(probeSpec api.Probe) api.Pod {
|
||||
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
|
||||
container := api.Container{
|
||||
Name: containerName,
|
||||
ReadinessProbe: &probeSpec,
|
||||
Name: containerName,
|
||||
}
|
||||
switch probeType {
|
||||
case readiness:
|
||||
container.ReadinessProbe = &probeSpec
|
||||
case liveness:
|
||||
container.LivenessProbe = &probeSpec
|
||||
}
|
||||
pod := api.Pod{
|
||||
Spec: api.PodSpec{
|
||||
@ -232,12 +263,18 @@ func getTestPod(probeSpec api.Probe) api.Pod {
|
||||
return pod
|
||||
}
|
||||
|
||||
type CrashingProber struct{}
|
||||
|
||||
func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) {
|
||||
panic("Intentional ProbeLiveness crash.")
|
||||
func resultsManager(m *manager, probeType probeType) results.Manager {
|
||||
switch probeType {
|
||||
case readiness:
|
||||
return m.readinessManager
|
||||
case liveness:
|
||||
return m.livenessManager
|
||||
}
|
||||
panic(fmt.Errorf("Unhandled case: %v", probeType))
|
||||
}
|
||||
|
||||
func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) {
|
||||
panic("Intentional ProbeReadiness crash.")
|
||||
type crashingExecProber struct{}
|
||||
|
||||
func (p crashingExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) {
|
||||
panic("Intentional Probe crash.")
|
||||
}
|
||||
|
@ -41,9 +41,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@ -89,7 +88,7 @@ type Runtime struct {
|
||||
containerRefManager *kubecontainer.RefManager
|
||||
generator kubecontainer.RunContainerOptionsGenerator
|
||||
recorder record.EventRecorder
|
||||
prober prober.Prober
|
||||
livenessManager proberesults.Manager
|
||||
volumeGetter volumeGetter
|
||||
imagePuller kubecontainer.ImagePuller
|
||||
}
|
||||
@ -108,8 +107,9 @@ func New(config *Config,
|
||||
generator kubecontainer.RunContainerOptionsGenerator,
|
||||
recorder record.EventRecorder,
|
||||
containerRefManager *kubecontainer.RefManager,
|
||||
prober prober.Prober,
|
||||
volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) {
|
||||
livenessManager proberesults.Manager,
|
||||
volumeGetter volumeGetter,
|
||||
imageBackOff *util.Backoff) (*Runtime, error) {
|
||||
|
||||
systemdVersion, err := getSystemdVersion()
|
||||
if err != nil {
|
||||
@ -146,7 +146,7 @@ func New(config *Config,
|
||||
containerRefManager: containerRefManager,
|
||||
generator: generator,
|
||||
recorder: recorder,
|
||||
prober: prober,
|
||||
livenessManager: livenessManager,
|
||||
volumeGetter: volumeGetter,
|
||||
}
|
||||
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
|
||||
@ -1032,17 +1032,13 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
|
||||
break
|
||||
}
|
||||
|
||||
result, err := r.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created)
|
||||
// TODO(vmarmol): examine this logic.
|
||||
if err == nil && result != probe.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
|
||||
glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result)
|
||||
liveness, found := r.livenessManager.Get(c.ID)
|
||||
if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
|
||||
glog.Infof("Pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name)
|
||||
restartPod = true
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Probe container %q failed: %v", container.Name, err)
|
||||
}
|
||||
delete(unidentifiedContainers, c.ID)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user