diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f2a803b0d5c..cf7773c176a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -203,22 +203,21 @@ func NewMainKubelet( containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst) klet := &Kubelet{ - hostname: hostname, - dockerClient: dockerClient, - kubeClient: kubeClient, - rootDirectory: rootDirectory, - resyncInterval: resyncInterval, - containerRefManager: kubecontainer.NewRefManager(), - readinessManager: kubecontainer.NewReadinessManager(), - runner: dockertools.NewDockerContainerCommandRunner(dockerClient), - httpClient: &http.Client{}, - sourcesReady: sourcesReady, - clusterDomain: clusterDomain, - clusterDNS: clusterDNS, - serviceLister: serviceLister, - nodeLister: nodeLister, - masterServiceNamespace: masterServiceNamespace, - prober: newProbeHolder(), + hostname: hostname, + dockerClient: dockerClient, + kubeClient: kubeClient, + rootDirectory: rootDirectory, + resyncInterval: resyncInterval, + containerRefManager: kubecontainer.NewRefManager(), + readinessManager: kubecontainer.NewReadinessManager(), + runner: dockertools.NewDockerContainerCommandRunner(dockerClient), + httpClient: &http.Client{}, + sourcesReady: sourcesReady, + clusterDomain: clusterDomain, + clusterDNS: clusterDNS, + serviceLister: serviceLister, + nodeLister: nodeLister, + masterServiceNamespace: masterServiceNamespace, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, recorder: recorder, cadvisor: cadvisorInterface, @@ -233,6 +232,7 @@ func NewMainKubelet( } klet.podManager = newBasicPodManager(klet.kubeClient) + klet.prober = NewProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder) runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager) if err != nil { @@ -312,11 +312,11 @@ type Kubelet struct { // Volume plugins. volumePluginMgr volume.VolumePluginMgr - // Network plugin + // Network plugin. networkPlugin network.NetworkPlugin - // Probe runner holder - prober probeHolder + // Healthy check prober. + prober *Prober // Container readiness state manager. readinessManager *kubecontainer.ReadinessManager @@ -1159,7 +1159,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta continue } - result, err := kl.probeContainer(pod, podStatus, container, string(c.ID), c.Created) + result, err := kl.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) if err != nil { // TODO(vmarmol): examine this logic. glog.V(2).Infof("probe no-error: %q", container.Name) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 494b59d49e3..cf391b22ea6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -113,6 +113,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { }, fakeRecorder) kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} + kubelet.prober = NewProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } diff --git a/pkg/kubelet/probe.go b/pkg/kubelet/probe.go index 4607e0c136e..cc8ac163dbb 100644 --- a/pkg/kubelet/probe.go +++ b/pkg/kubelet/probe.go @@ -22,7 +22,9 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec" httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http" @@ -35,50 +37,83 @@ import ( const maxProbeRetries = 3 -// probeContainer probes the liveness/readiness of the given container. +// Prober helps to check the liveness/readiness of a container. +// TODO(yifan): Replace the concrete type with interface later. +type Prober struct { + exec execprobe.ExecProber + http httprobe.HTTPProber + tcp tcprobe.TCPProber + runner dockertools.ContainerCommandRunner + + readinessManager *kubecontainer.ReadinessManager + refManager *kubecontainer.RefManager + recorder record.EventRecorder +} + +// NewProber creates a Prober, it takes a command runner and +// several container info managers. +func NewProber( + runner dockertools.ContainerCommandRunner, + readinessManager *kubecontainer.ReadinessManager, + refManager *kubecontainer.RefManager, + recorder record.EventRecorder) *Prober { + + return &Prober{ + exec: execprobe.New(), + http: httprobe.New(), + tcp: tcprobe.New(), + runner: runner, + + readinessManager: readinessManager, + refManager: refManager, + recorder: recorder, + } +} + +// Probe checks the liveness/readiness of the given container. // If the container's liveness probe is unsuccessful, set readiness to false. // If liveness is successful, do a readiness check and set readiness accordingly. -func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { +func (pb *Prober) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { // Probe liveness. - live, err := kl.probeContainerLiveness(pod, status, container, createdAt) + live, err := pb.probeLiveness(pod, status, container, containerID, createdAt) if err != nil { glog.V(1).Infof("Liveness probe errored: %v", err) - kl.readinessManager.SetReadiness(containerID, false) + pb.readinessManager.SetReadiness(containerID, false) return probe.Unknown, err } if live != probe.Success { glog.V(1).Infof("Liveness probe unsuccessful: %v", live) - kl.readinessManager.SetReadiness(containerID, false) + pb.readinessManager.SetReadiness(containerID, false) return live, nil } // Probe readiness. - ready, err := kl.probeContainerReadiness(pod, status, container, createdAt) + ready, err := pb.probeReadiness(pod, status, container, containerID, createdAt) if err == nil && ready == probe.Success { glog.V(3).Infof("Readiness probe successful: %v", ready) - kl.readinessManager.SetReadiness(containerID, true) + pb.readinessManager.SetReadiness(containerID, true) return probe.Success, nil } glog.V(1).Infof("Readiness probe failed/errored: %v, %v", ready, err) - kl.readinessManager.SetReadiness(containerID, false) + pb.readinessManager.SetReadiness(containerID, false) - ref, ok := kl.containerRefManager.GetRef(containerID) + ref, ok := pb.refManager.GetRef(containerID) if !ok { glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name) return probe.Success, err } if ready != probe.Success { - kl.recorder.Eventf(ref, "unhealthy", "Readiness Probe Failed %v - %v", containerID, container.Name) + pb.recorder.Eventf(ref, "unhealthy", "Readiness Probe Failed %v - %v", containerID, container.Name) } return probe.Success, nil } -// probeContainerLiveness probes the liveness of a container. +// probeLiveness probes the liveness of a container. // If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success. -func (kl *Kubelet) probeContainerLiveness(pod *api.Pod, status api.PodStatus, container api.Container, createdAt int64) (probe.Result, error) { +func (pb *Prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { p := container.LivenessProbe if p == nil { return probe.Success, nil @@ -86,12 +121,12 @@ func (kl *Kubelet) probeContainerLiveness(pod *api.Pod, status api.PodStatus, co if time.Now().Unix()-createdAt < p.InitialDelaySeconds { return probe.Success, nil } - return kl.runProbeWithRetries(p, pod, status, container, maxProbeRetries) + return pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) } -// probeContainerReadiness probes the readiness of a container. +// probeReadiness probes the readiness of a container. // If the initial delay on the readiness probe has not passed the probe will return probe.Failure. -func (kl *Kubelet) probeContainerReadiness(pod *api.Pod, status api.PodStatus, container api.Container, createdAt int64) (probe.Result, error) { +func (pb *Prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { p := container.ReadinessProbe if p == nil { return probe.Success, nil @@ -99,16 +134,16 @@ func (kl *Kubelet) probeContainerReadiness(pod *api.Pod, status api.PodStatus, c if time.Now().Unix()-createdAt < p.InitialDelaySeconds { return probe.Failure, nil } - return kl.runProbeWithRetries(p, pod, status, container, maxProbeRetries) + return pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) } // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result // if it never succeeds. -func (kl *Kubelet) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, retires int) (probe.Result, error) { +func (pb *Prober) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID string, retires int) (probe.Result, error) { var err error var result probe.Result for i := 0; i < retires; i++ { - result, err = kl.runProbe(p, pod, status, container) + result, err = pb.runProbe(p, pod, status, container, containerID) if result == probe.Success { return probe.Success, nil } @@ -116,11 +151,11 @@ func (kl *Kubelet) runProbeWithRetries(p *api.Probe, pod *api.Pod, status api.Po return result, err } -func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container) (probe.Result, error) { +func (pb *Prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec != nil { glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v", pod, container) - return kl.prober.exec.Probe(kl.newExecInContainer(pod, container)) + return pb.exec.Probe(pb.newExecInContainer(pod, container, containerID)) } if p.HTTPGet != nil { port, err := extractPort(p.HTTPGet.Port, container) @@ -129,7 +164,7 @@ func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, co } host, port, path := extractGetParams(p.HTTPGet, status, port) glog.V(4).Infof("HTTP-Probe Host: %v, Port: %v, Path: %v", host, port, path) - return kl.prober.http.Probe(host, port, path, timeout) + return pb.http.Probe(host, port, path, timeout) } if p.TCPSocket != nil { port, err := extractPort(p.TCPSocket.Port, container) @@ -137,7 +172,7 @@ func (kl *Kubelet) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, co return probe.Unknown, err } glog.V(4).Infof("TCP-Probe PodIP: %v, Port: %v, Timeout: %v", status.PodIP, port, timeout) - return kl.prober.tcp.Probe(status.PodIP, port, timeout) + return pb.tcp.Probe(status.PodIP, port, timeout) } glog.Warningf("Failed to find probe builder for %s %+v", container.Name, container.LivenessProbe) return probe.Unknown, nil @@ -193,11 +228,9 @@ type execInContainer struct { run func() ([]byte, error) } -func (kl *Kubelet) newExecInContainer(pod *api.Pod, container api.Container) exec.Cmd { - uid := pod.UID - podFullName := kubecontainer.GetPodFullName(pod) +func (p *Prober) newExecInContainer(pod *api.Pod, container api.Container, containerID string) exec.Cmd { return execInContainer{func() ([]byte, error) { - return kl.RunInContainer(podFullName, uid, container.Name, container.LivenessProbe.Exec.Command) + return p.runner.RunInContainer(containerID, container.LivenessProbe.Exec.Command) }} } @@ -208,17 +241,3 @@ func (eic execInContainer) CombinedOutput() ([]byte, error) { func (eic execInContainer) SetDir(dir string) { //unimplemented } - -func newProbeHolder() probeHolder { - return probeHolder{ - exec: execprobe.New(), - http: httprobe.New(), - tcp: tcprobe.New(), - } -} - -type probeHolder struct { - exec execprobe.ExecProber - http httprobe.HTTPProber - tcp tcprobe.TCPProber -} diff --git a/pkg/kubelet/probe_test.go b/pkg/kubelet/probe_test.go index a8f3692f32e..b87878cb391 100644 --- a/pkg/kubelet/probe_test.go +++ b/pkg/kubelet/probe_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -146,16 +147,21 @@ func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, error) { } func makeTestKubelet(result probe.Result, err error) *Kubelet { - return &Kubelet{ - readinessManager: kubecontainer.NewReadinessManager(), - prober: probeHolder{ - exec: fakeExecProber{ - result: result, - err: err, - }, - }, + kl := &Kubelet{ + readinessManager: kubecontainer.NewReadinessManager(), containerRefManager: kubecontainer.NewRefManager(), } + + kl.prober = &Prober{ + exec: fakeExecProber{ + result: result, + err: err, + }, + readinessManager: kl.readinessManager, + refManager: kl.containerRefManager, + recorder: &record.FakeRecorder{}, + } + return kl } // TestProbeContainer tests the functionality of probeContainer. @@ -402,7 +408,7 @@ func TestProbeContainer(t *testing.T) { } else { kl = makeTestKubelet(test.expectedResult, nil) } - result, err := kl.probeContainer(&api.Pod{}, api.PodStatus{}, test.testContainer, dc.ID, dc.Created) + result, err := kl.prober.Probe(&api.Pod{}, api.PodStatus{}, test.testContainer, dc.ID, dc.Created) if test.expectError && err == nil { t.Error("Expected error but did no error was returned.") }