Merge pull request #7009 from yifan-gu/kube_dep

kubelet: Refactor prober.
This commit is contained in:
Yu-Ju Hong 2015-04-20 10:23:04 -07:00
commit cd61aa9484
4 changed files with 96 additions and 69 deletions

View File

@ -203,22 +203,21 @@ func NewMainKubelet(
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
klet := &Kubelet{
hostname: hostname,
dockerClient: dockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
masterServiceNamespace: masterServiceNamespace,
prober: newProbeHolder(),
hostname: hostname,
dockerClient: dockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
@ -233,6 +232,7 @@ func NewMainKubelet(
}
klet.podManager = newBasicPodManager(klet.kubeClient)
klet.prober = NewProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
if err != nil {
@ -312,11 +312,11 @@ type Kubelet struct {
// Volume plugins.
volumePluginMgr volume.VolumePluginMgr
// Network plugin
// Network plugin.
networkPlugin network.NetworkPlugin
// Probe runner holder
prober probeHolder
// Healthy check prober.
prober *Prober
// Container readiness state manager.
readinessManager *kubecontainer.ReadinessManager
@ -1159,7 +1159,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
continue
}
result, err := kl.probeContainer(pod, podStatus, container, string(c.ID), c.Created)
result, err := kl.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)

View File

@ -113,6 +113,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
},
fakeRecorder)
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
kubelet.prober = NewProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
}

View File

@ -22,7 +22,9 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
@ -35,50 +37,83 @@ import (
const maxProbeRetries = 3
// probeContainer probes the liveness/readiness of the given container.
// Prober helps to check the liveness/readiness of a container.
// TODO(yifan): Replace the concrete type with interface later.
type Prober struct {
exec execprobe.ExecProber
http httprobe.HTTPProber
tcp tcprobe.TCPProber
runner dockertools.ContainerCommandRunner
readinessManager *kubecontainer.ReadinessManager
refManager *kubecontainer.RefManager
recorder record.EventRecorder
}
// NewProber creates a Prober, it takes a command runner and
// several container info managers.
func NewProber(
runner dockertools.ContainerCommandRunner,
readinessManager *kubecontainer.ReadinessManager,
refManager *kubecontainer.RefManager,
recorder record.EventRecorder) *Prober {
return &Prober{
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
runner: runner,
readinessManager: readinessManager,
refManager: refManager,
recorder: recorder,
}
}
// Probe checks the liveness/readiness of the given container.
// If the container's liveness probe is unsuccessful, set readiness to false.
// If liveness is successful, do a readiness check and set readiness accordingly.
func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
func (pb *Prober) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
// Probe liveness.
live, err := kl.probeContainerLiveness(pod, status, container, createdAt)
live, err := pb.probeLiveness(pod, status, container, containerID, createdAt)
if err != nil {
glog.V(1).Infof("Liveness probe errored: %v", err)
kl.readinessManager.SetReadiness(containerID, false)
pb.readinessManager.SetReadiness(containerID, false)
return probe.Unknown, err
}
if live != probe.Success {
glog.V(1).Infof("Liveness probe unsuccessful: %v", live)
kl.readinessManager.SetReadiness(containerID, false)
pb.readinessManager.SetReadiness(containerID, false)
return live, nil
}
// Probe readiness.
ready, err := kl.probeContainerReadiness(pod, status, container, createdAt)
ready, err := pb.probeReadiness(pod, status, container, containerID, createdAt)
if err == nil && ready == probe.Success {
glog.V(3).Infof("Readiness probe successful: %v", ready)
kl.readinessManager.SetReadiness(containerID, true)
pb.readinessManager.SetReadiness(containerID, true)
return probe.Success, nil
}
glog.V(1).Infof("Readiness probe failed/errored: %v, %v", ready, err)
kl.readinessManager.SetReadiness(containerID, false)
pb.readinessManager.SetReadiness(containerID, false)
ref, ok := kl.containerRefManager.GetRef(containerID)
ref, ok := pb.refManager.GetRef(containerID)
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name)
return probe.Success, err
}
if ready != probe.Success {
kl.recorder.Eventf(ref, "unhealthy", "Readiness Probe Failed %v - %v", containerID, container.Name)
pb.recorder.Eventf(ref, "unhealthy", "Readiness Probe Failed %v - %v", containerID, container.Name)
}
return probe.Success, nil
}
// probeContainerLiveness probes the liveness of a container.
// probeLiveness probes the liveness of a container.
// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success.
func (kl *Kubelet) probeContainerLiveness(pod *api.Pod, status api.PodStatus, container api.Container, createdAt int64) (probe.Result, error) {
func (pb *Prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
p := container.LivenessProbe
if p == nil {
return probe.Success, nil
@ -86,12 +121,12 @@ func (kl *Kubelet) probeContainerLiveness(pod *api.Pod, status api.PodStatus, co
if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
return probe.Success, nil
}
return kl.runProbeWithRetries(p, pod, status, container, maxProbeRetries)
return pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
}
// probeContainerReadiness probes the readiness of a container.
// probeReadiness probes the readiness of a container.
// If the initial delay on the readiness probe has not passed the probe will return probe.Failure.
func (kl *Kubelet) probeContainerReadiness(pod *api.Pod, status api.PodStatus, container api.Container, createdAt int64) (probe.Result, error) {
func (pb *Prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
p := container.ReadinessProbe
if p == nil {
return probe.Success, nil
@ -99,16 +134,16 @@ func (kl *Kubelet) probeContainerReadiness(pod *api.Pod, status api.PodStatus, c
if time.Now().Unix()-createdAt < p.InitialDelaySeconds {
return probe.Failure, nil
}
return kl.runProbeWithRetries(p, pod, status, container, maxProbeRetries)
return pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries)
}
// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
// if it never succeeds.
func (kl *Kubelet) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, retires int) (probe.Result, error) {
func (pb *Prober) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID string, retires int) (probe.Result, error) {
var err error
var result probe.Result
for i := 0; i < retires; i++ {
result, err = kl.runProbe(p, pod, status, container)
result, err = pb.runProbe(p, pod, status, container, containerID)
if result == probe.Success {
return probe.Success, nil
}
@ -116,11 +151,11 @@ func (kl *Kubelet) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.Po
return result, err
}
func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container) (probe.Result, error) {
func (pb *Prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v", pod, container)
return kl.prober.exec.Probe(kl.newExecInContainer(pod, container))
return pb.exec.Probe(pb.newExecInContainer(pod, container, containerID))
}
if p.HTTPGet != nil {
port, err := extractPort(p.HTTPGet.Port, container)
@ -129,7 +164,7 @@ func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, co
}
host, port, path := extractGetParams(p.HTTPGet, status, port)
glog.V(4).Infof("HTTP-Probe Host: %v, Port: %v, Path: %v", host, port, path)
return kl.prober.http.Probe(host, port, path, timeout)
return pb.http.Probe(host, port, path, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
@ -137,7 +172,7 @@ func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, co
return probe.Unknown, err
}
glog.V(4).Infof("TCP-Probe PodIP: %v, Port: %v, Timeout: %v", status.PodIP, port, timeout)
return kl.prober.tcp.Probe(status.PodIP, port, timeout)
return pb.tcp.Probe(status.PodIP, port, timeout)
}
glog.Warningf("Failed to find probe builder for %s %+v", container.Name, container.LivenessProbe)
return probe.Unknown, nil
@ -193,11 +228,9 @@ type execInContainer struct {
run func() ([]byte, error)
}
func (kl *Kubelet) newExecInContainer(pod *api.Pod, container api.Container) exec.Cmd {
uid := pod.UID
podFullName := kubecontainer.GetPodFullName(pod)
func (p *Prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return kl.RunInContainer(podFullName, uid, container.Name, container.LivenessProbe.Exec.Command)
return p.runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command)
}}
}
@ -208,17 +241,3 @@ func (eic execInContainer) CombinedOutput() ([]byte, error) {
func (eic execInContainer) SetDir(dir string) {
//unimplemented
}
func newProbeHolder() probeHolder {
return probeHolder{
exec: execprobe.New(),
http: httprobe.New(),
tcp: tcprobe.New(),
}
}
type probeHolder struct {
exec execprobe.ExecProber
http httprobe.HTTPProber
tcp tcprobe.TCPProber
}

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -146,16 +147,21 @@ func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, error) {
}
func makeTestKubelet(result probe.Result, err error) *Kubelet {
return &Kubelet{
readinessManager: kubecontainer.NewReadinessManager(),
prober: probeHolder{
exec: fakeExecProber{
result: result,
err: err,
},
},
kl := &Kubelet{
readinessManager: kubecontainer.NewReadinessManager(),
containerRefManager: kubecontainer.NewRefManager(),
}
kl.prober = &Prober{
exec: fakeExecProber{
result: result,
err: err,
},
readinessManager: kl.readinessManager,
refManager: kl.containerRefManager,
recorder: &record.FakeRecorder{},
}
return kl
}
// TestProbeContainer tests the functionality of probeContainer.
@ -402,7 +408,7 @@ func TestProbeContainer(t *testing.T) {
} else {
kl = makeTestKubelet(test.expectedResult, nil)
}
result, err := kl.probeContainer(&api.Pod{}, api.PodStatus{}, test.testContainer, dc.ID, dc.Created)
result, err := kl.prober.Probe(&api.Pod{}, api.PodStatus{}, test.testContainer, dc.ID, dc.Created)
if test.expectError && err == nil {
t.Error("Expected error but did no error was returned.")
}