diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 0edb18e46ef..0bd2140f6ad 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -51,7 +51,7 @@ func TrimRuntimePrefix(fullString string) string { // ShouldContainerBeRestarted checks whether a container needs to be restarted. // TODO(yifan): Think about how to refactor this. -func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *ReadinessManager) bool { +func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool { podFullName := GetPodFullName(pod) // Get all dead container status. @@ -62,11 +62,6 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu } } - // Set dead containers to notReady state. - for _, c := range resultStatus { - readinessManager.RemoveReadiness(TrimRuntimePrefix(c.ContainerID)) - } - // Check RestartPolicy for dead container. if len(resultStatus) > 0 { if pod.Spec.RestartPolicy == api.RestartPolicyNever { diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 6126d1c2111..60fac0a69ba 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -30,7 +30,7 @@ import ( func NewFakeDockerManager( client DockerInterface, recorder record.EventRecorder, - readinessManager *kubecontainer.ReadinessManager, + prober prober.Prober, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorApi.MachineInfo, podInfraContainerImage string, @@ -44,10 +44,9 @@ func NewFakeDockerManager( fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeProcFs := procfs.NewFakeProcFs() - dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, + dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false) dm.dockerPuller = &FakeDockerPuller{} - dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder) return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 74631ec452f..3704cdf8583 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -87,7 +87,6 @@ var podInfraContainerImagePullPolicy = api.PullIfNotPresent type DockerManager struct { client DockerInterface recorder record.EventRecorder - readinessManager *kubecontainer.ReadinessManager containerRefManager *kubecontainer.RefManager os kubecontainer.OSInterface machineInfo *cadvisorApi.MachineInfo @@ -145,7 +144,7 @@ type DockerManager struct { func NewDockerManager( client DockerInterface, recorder record.EventRecorder, - readinessManager *kubecontainer.ReadinessManager, + prober prober.Prober, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorApi.MachineInfo, podInfraContainerImage string, @@ -195,7 +194,6 @@ func NewDockerManager( dm := &DockerManager{ client: client, recorder: recorder, - readinessManager: readinessManager, containerRefManager: containerRefManager, os: osInterface, machineInfo: machineInfo, @@ -205,7 +203,7 @@ func NewDockerManager( dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, - prober: nil, + prober: prober, generator: generator, execHandler: execHandler, oomAdjuster: oomAdjuster, @@ -213,7 +211,6 @@ func NewDockerManager( cpuCFSQuota: cpuCFSQuota, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder) dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm) return dm @@ -1363,8 +1360,6 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con gracePeriod -= int64(unversioned.Now().Sub(start.Time).Seconds()) } - dm.readinessManager.RemoveReadiness(ID) - // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs if gracePeriod < minimumGracePeriodInSeconds { gracePeriod = minimumGracePeriodInSeconds @@ -1659,7 +1654,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub c := runningPod.FindContainerByName(container.Name) if c == nil { - if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) { + if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) { // If we are here it means that the container is dead and should be restarted, or never existed and should // be created. We may be inserting this ID again if the container has changed and it has // RestartPolicy::Always, but it's not a big deal. @@ -1694,7 +1689,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub continue } - result, err := dm.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) + result, err := dm.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created) if err != nil { // TODO(vmarmol): examine this logic. glog.V(2).Infof("probe no-error: %q", container.Name) diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 93c5e7aaa0d..da702d5c550 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" uexec "k8s.io/kubernetes/pkg/util/exec" @@ -74,14 +75,13 @@ func (*fakeOptionGenerator) GenerateRunContainerOptions(pod *api.Pod, container func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManager, *FakeDockerClient) { fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}} fakeRecorder := &record.FakeRecorder{} - readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) optionGenerator := &fakeOptionGenerator{} dockerManager := NewFakeDockerManager( fakeDocker, fakeRecorder, - readinessManager, + prober.FakeProber{}, containerRefManager, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, @@ -398,10 +398,6 @@ func TestKillContainerInPod(t *testing.T) { containerToKill := &containers[0] containerToSpare := &containers[1] fakeDocker.ContainerList = containers - // Set all containers to ready. - for _, c := range fakeDocker.ContainerList { - manager.readinessManager.SetReadiness(c.ID, true) - } if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { t.Errorf("unexpected error: %v", err) @@ -410,13 +406,9 @@ func TestKillContainerInPod(t *testing.T) { if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil { t.Errorf("container was not stopped correctly: %v", err) } - - // Verify that the readiness has been removed for the stopped container. - if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready { - t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready) - } - if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready { - t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready) + // Assert the container has been spared. + if err := fakeDocker.AssertStopped([]string{containerToSpare.ID}); err == nil { + t.Errorf("container unexpectedly stopped: %v", containerToSpare.ID) } } @@ -471,10 +463,6 @@ func TestKillContainerInPodWithPreStop(t *testing.T) { }, }, } - // Set all containers to ready. - for _, c := range fakeDocker.ContainerList { - manager.readinessManager.SetReadiness(c.ID, true) - } if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { t.Errorf("unexpected error: %v", err) @@ -510,27 +498,12 @@ func TestKillContainerInPodWithError(t *testing.T) { Names: []string{"/k8s_bar_qux_new_1234_42"}, }, } - containerToKill := &containers[0] - containerToSpare := &containers[1] fakeDocker.ContainerList = containers fakeDocker.Errors["stop"] = fmt.Errorf("sample error") - // Set all containers to ready. - for _, c := range fakeDocker.ContainerList { - manager.readinessManager.SetReadiness(c.ID, true) - } - if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil { t.Errorf("expected error, found nil") } - - // Verify that the readiness has been removed even though the stop failed. - if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready { - t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready) - } - if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready { - t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready) - } } func TestIsAExitError(t *testing.T) { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c9c31e5cb80..2297badb846 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -52,11 +52,13 @@ import ( "k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/status" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -243,7 +245,6 @@ func NewMainKubelet( return nil, fmt.Errorf("failed to initialize disk manager: %v", err) } statusManager := status.NewManager(kubeClient) - readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() volumeManager := newVolumeManager() @@ -258,7 +259,6 @@ func NewMainKubelet( rootDirectory: rootDirectory, resyncInterval: resyncInterval, containerRefManager: containerRefManager, - readinessManager: readinessManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, registerNode: registerNode, @@ -317,7 +317,7 @@ func NewMainKubelet( klet.containerRuntime = dockertools.NewDockerManager( dockerClient, recorder, - readinessManager, + klet, // prober containerRefManager, machineInfo, podInfraContainerImage, @@ -343,7 +343,7 @@ func NewMainKubelet( klet, recorder, containerRefManager, - readinessManager, + klet, // prober klet.volumeManager) if err != nil { return nil, err @@ -386,6 +386,12 @@ func NewMainKubelet( klet.runner = klet.containerRuntime klet.podManager = newBasicPodManager(klet.kubeClient) + klet.prober = prober.New(klet.runner, containerRefManager, recorder) + klet.probeManager = prober.NewManager( + klet.resyncInterval, + klet.statusManager, + klet.prober) + runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { return nil, err @@ -486,8 +492,10 @@ type Kubelet struct { // Network plugin. networkPlugin network.NetworkPlugin - // Container readiness state manager. - readinessManager *kubecontainer.ReadinessManager + // Handles container readiness probing + probeManager prober.Manager + // TODO: Move prober ownership to the probeManager once the runtime no longer depends on it. + prober prober.Prober // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -1665,6 +1673,7 @@ func (kl *Kubelet) HandlePodCleanups() error { // Stop the workers for no-longer existing pods. // TODO: is here the best place to forget pod workers? kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) + kl.probeManager.CleanupPods(activePods) runningPods, err := kl.runtimeCache.GetPods() if err != nil { @@ -1993,6 +2002,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) + kl.probeManager.AddPod(pod) } } @@ -2024,6 +2034,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { if err := kl.deletePod(pod.UID); err != nil { glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err) } + kl.probeManager.RemovePod(pod) } } @@ -2613,15 +2624,8 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { // Assume info is ready to process podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses) - for _, c := range spec.Containers { - for i, st := range podStatus.ContainerStatuses { - if st.Name == c.Name { - ready := st.State.Running != nil && kl.readinessManager.GetReadiness(kubecontainer.TrimRuntimePrefix(st.ContainerID)) - podStatus.ContainerStatuses[i].Ready = ready - break - } - } - } + kl.probeManager.UpdatePodStatus(pod.UID, podStatus) + podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...) if !kl.standaloneMode { @@ -2791,6 +2795,16 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { return kl.containerRuntime } +// Proxy prober calls through the Kubelet to break the circular dependency between the runtime & +// prober. +// TODO: Remove this hack once the runtime no longer depends on the prober. +func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { + return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt) +} +func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) { + return kl.prober.ProbeReadiness(pod, status, container, containerID) +} + var minRsrc = resource.MustParse("1k") var maxRsrc = resource.MustParse("1P") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 11f16171d87..152b97c84eb 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -45,6 +45,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" @@ -105,7 +106,6 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} - kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder kubelet.statusManager = status.NewManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { @@ -130,6 +130,10 @@ func newTestKubelet(t *testing.T) *TestKubelet { runtimeCache: kubelet.runtimeCache, t: t, } + + kubelet.prober = prober.FakeProber{} + kubelet.probeManager = prober.FakeManager{} + kubelet.volumeManager = newVolumeManager() kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "") kubelet.networkConfigured = true diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index 91bbf879d6c..e973309e4ba 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -37,6 +37,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/util/sets" ) @@ -143,13 +144,12 @@ func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime { func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) { fakeDocker := &dockertools.FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}} fakeRecorder := &record.FakeRecorder{} - readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) dockerManager := dockertools.NewFakeDockerManager( fakeDocker, fakeRecorder, - readinessManager, + prober.FakeProber{}, containerRefManager, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, diff --git a/pkg/kubelet/prober/prober_fake.go b/pkg/kubelet/prober/fake_manager.go similarity index 60% rename from pkg/kubelet/prober/prober_fake.go rename to pkg/kubelet/prober/fake_manager.go index ad933ccf9be..65742823cb2 100644 --- a/pkg/kubelet/prober/prober_fake.go +++ b/pkg/kubelet/prober/fake_manager.go @@ -18,14 +18,20 @@ package prober import ( "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/types" ) -var _ Prober = &FakeProber{} +type FakeManager struct{} -type FakeProber struct { -} +var _ Manager = FakeManager{} -func (fp *FakeProber) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { - return probe.Success, nil +// Unused methods. +func (_ FakeManager) AddPod(_ *api.Pod) {} +func (_ FakeManager) RemovePod(_ *api.Pod) {} +func (_ FakeManager) CleanupPods(_ []*api.Pod) {} + +func (_ FakeManager) UpdatePodStatus(_ types.UID, podStatus *api.PodStatus) { + for i := range podStatus.ContainerStatuses { + podStatus.ContainerStatuses[i].Ready = true + } } diff --git a/pkg/kubelet/prober/fake_prober.go b/pkg/kubelet/prober/fake_prober.go new file mode 100644 index 00000000000..3fbb46e8224 --- /dev/null +++ b/pkg/kubelet/prober/fake_prober.go @@ -0,0 +1,44 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/probe" +) + +var _ Prober = FakeProber{} + +type FakeProber struct { + Readiness probe.Result + Liveness probe.Result + Error error +} + +func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) { + if c.LivenessProbe == nil { + return probe.Success, nil + } + return f.Liveness, f.Error +} + +func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) { + if c.ReadinessProbe == nil { + return probe.Success, nil + } + return f.Readiness, f.Error +} diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go new file mode 100644 index 00000000000..340a5a51fd0 --- /dev/null +++ b/pkg/kubelet/prober/manager.go @@ -0,0 +1,167 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/sets" +) + +// Manager manages pod probing. It creates a probe "worker" for every container that specifies a +// probe (AddPod). The worker periodically probes its assigned container and caches the results. The +// manager usse the cached probe results to set the appropriate Ready state in the PodStatus when +// requested (UpdatePodStatus). Updating probe parameters is not currently supported. +// TODO: Move liveness probing out of the runtime, to here. +type Manager interface { + // AddPod creates new probe workers for every container probe. This should be called for every + // pod created. + AddPod(pod *api.Pod) + + // RemovePod handles cleaning up the removed pod state, including terminating probe workers and + // deleting cached results. + RemovePod(pod *api.Pod) + + // CleanupPods handles cleaning up pods which should no longer be running. + // It takes a list of "active pods" which should not be cleaned up. + CleanupPods(activePods []*api.Pod) + + // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each + // container based on container running status, cached probe results and worker states. + UpdatePodStatus(types.UID, *api.PodStatus) +} + +type manager struct { + // Caches the results of readiness probes. + readinessCache *readinessManager + + // Map of active workers for readiness + readinessProbes map[containerPath]*worker + // Lock for accessing & mutating readinessProbes + workerLock sync.RWMutex + + // The statusManager cache provides pod IP and container IDs for probing. + statusManager status.Manager + + // prober executes the probe actions. + prober Prober + + // Default period for workers to execute a probe. + defaultProbePeriod time.Duration +} + +func NewManager( + defaultProbePeriod time.Duration, + statusManager status.Manager, + prober Prober) Manager { + return &manager{ + defaultProbePeriod: defaultProbePeriod, + statusManager: statusManager, + prober: prober, + readinessCache: newReadinessManager(), + readinessProbes: make(map[containerPath]*worker), + } +} + +// Key uniquely identifying containers +type containerPath struct { + podUID types.UID + containerName string +} + +func (m *manager) AddPod(pod *api.Pod) { + m.workerLock.Lock() + defer m.workerLock.Unlock() + + key := containerPath{podUID: pod.UID} + for _, c := range pod.Spec.Containers { + key.containerName = c.Name + if _, ok := m.readinessProbes[key]; ok { + glog.Errorf("Readiness probe already exists! %v - %v", + kubecontainer.GetPodFullName(pod), c.Name) + return + } + if c.ReadinessProbe != nil { + m.readinessProbes[key] = m.newWorker(pod, c) + } + } +} + +func (m *manager) RemovePod(pod *api.Pod) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + key := containerPath{podUID: pod.UID} + for _, c := range pod.Spec.Containers { + key.containerName = c.Name + if worker, ok := m.readinessProbes[key]; ok { + close(worker.stop) + } + } +} + +func (m *manager) CleanupPods(activePods []*api.Pod) { + desiredPods := make(map[types.UID]sets.Empty) + for _, pod := range activePods { + desiredPods[pod.UID] = sets.Empty{} + } + + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + for path, worker := range m.readinessProbes { + if _, ok := desiredPods[path.podUID]; !ok { + close(worker.stop) + } + } +} + +func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) { + for i, c := range podStatus.ContainerStatuses { + var ready bool + if c.State.Running == nil { + ready = false + } else if result, ok := m.readinessCache.getReadiness(kubecontainer.TrimRuntimePrefix(c.ContainerID)); ok { + ready = result + } else { + // The check whether there is a probe which hasn't run yet. + _, exists := m.getReadinessProbe(podUID, c.Name) + ready = !exists + } + podStatus.ContainerStatuses[i].Ready = ready + } +} + +func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + probe, ok := m.readinessProbes[containerPath{podUID, containerName}] + return probe, ok +} + +// Called by the worker after exiting. +func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) { + m.workerLock.Lock() + defer m.workerLock.Unlock() + delete(m.readinessProbes, containerPath{podUID, containerName}) +} diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go new file mode 100644 index 00000000000..baada0dc2fe --- /dev/null +++ b/pkg/kubelet/prober/manager_test.go @@ -0,0 +1,280 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "fmt" + "testing" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestAddRemovePods(t *testing.T) { + noProbePod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "no_probe_pod", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "no_probe1", + }, { + Name: "no_probe2", + }}, + }, + } + + probePod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "probe_pod", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "no_probe1", + }, { + Name: "prober1", + ReadinessProbe: &api.Probe{}, + }, { + Name: "no_probe2", + }, { + Name: "prober2", + ReadinessProbe: &api.Probe{}, + }}, + }, + } + + m := newTestManager() + if err := expectProbes(m, nil); err != nil { + t.Error(err) + } + + // Adding a pod with no probes should be a no-op. + m.AddPod(&noProbePod) + if err := expectProbes(m, nil); err != nil { + t.Error(err) + } + + // Adding a pod with probes. + m.AddPod(&probePod) + probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}} + if err := expectProbes(m, probePaths); err != nil { + t.Error(err) + } + + // Removing un-probed pod. + m.RemovePod(&noProbePod) + if err := expectProbes(m, probePaths); err != nil { + t.Error(err) + } + + // Removing probed pod. + m.RemovePod(&probePod) + if err := waitForWorkerExit(m, probePaths); err != nil { + t.Fatal(err) + } + if err := expectProbes(m, nil); err != nil { + t.Error(err) + } + + // Removing already removed pods should be a no-op. + m.RemovePod(&probePod) + if err := expectProbes(m, nil); err != nil { + t.Error(err) + } +} + +func TestCleanupPods(t *testing.T) { + m := newTestManager() + podToCleanup := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "pod_cleanup", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "prober1", + ReadinessProbe: &api.Probe{}, + }, { + Name: "prober2", + ReadinessProbe: &api.Probe{}, + }}, + }, + } + podToKeep := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "pod_keep", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{ + Name: "prober1", + ReadinessProbe: &api.Probe{}, + }, { + Name: "prober2", + ReadinessProbe: &api.Probe{}, + }}, + }, + } + m.AddPod(&podToCleanup) + m.AddPod(&podToKeep) + + m.CleanupPods([]*api.Pod{&podToKeep}) + + removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}} + expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}} + if err := waitForWorkerExit(m, removedProbes); err != nil { + t.Fatal(err) + } + if err := expectProbes(m, expectedProbes); err != nil { + t.Error(err) + } +} + +func TestUpdatePodStatus(t *testing.T) { + const podUID = "pod_uid" + unprobed := api.ContainerStatus{ + Name: "unprobed_container", + ContainerID: "unprobed_container_id", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + probedReady := api.ContainerStatus{ + Name: "probed_container_ready", + ContainerID: "probed_container_ready_id", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + probedPending := api.ContainerStatus{ + Name: "probed_container_pending", + ContainerID: "probed_container_pending_id", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + probedUnready := api.ContainerStatus{ + Name: "probed_container_unready", + ContainerID: "probed_container_unready_id", + State: api.ContainerState{ + Running: &api.ContainerStateRunning{}, + }, + } + terminated := api.ContainerStatus{ + Name: "terminated_container", + ContainerID: "terminated_container_id", + State: api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, + }, + } + podStatus := api.PodStatus{ + Phase: api.PodRunning, + ContainerStatuses: []api.ContainerStatus{ + unprobed, probedReady, probedPending, probedUnready, terminated, + }, + } + + m := newTestManager() + // Setup probe "workers" and cached results. + m.readinessProbes = map[containerPath]*worker{ + containerPath{podUID, probedReady.Name}: {}, + containerPath{podUID, probedPending.Name}: {}, + containerPath{podUID, probedUnready.Name}: {}, + containerPath{podUID, terminated.Name}: {}, + } + m.readinessCache.setReadiness(probedReady.ContainerID, true) + m.readinessCache.setReadiness(probedUnready.ContainerID, false) + m.readinessCache.setReadiness(terminated.ContainerID, true) + + m.UpdatePodStatus(podUID, &podStatus) + + expectedReadiness := map[containerPath]bool{ + containerPath{podUID, unprobed.Name}: true, + containerPath{podUID, probedReady.Name}: true, + containerPath{podUID, probedPending.Name}: false, + containerPath{podUID, probedUnready.Name}: false, + containerPath{podUID, terminated.Name}: false, + } + for _, c := range podStatus.ContainerStatuses { + expected, ok := expectedReadiness[containerPath{podUID, c.Name}] + if !ok { + t.Fatalf("Missing expectation for test case: %v", c.Name) + } + if expected != c.Ready { + t.Errorf("Unexpected readiness for container %v: Expected %v but got %v", + c.Name, expected, c.Ready) + } + } +} + +func expectProbes(m *manager, expectedReadinessProbes []containerPath) error { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + var unexpected []containerPath + missing := make([]containerPath, len(expectedReadinessProbes)) + copy(missing, expectedReadinessProbes) + +outer: + for probePath := range m.readinessProbes { + for i, expectedPath := range missing { + if probePath == expectedPath { + missing = append(missing[:i], missing[i+1:]...) + continue outer + } + } + unexpected = append(unexpected, probePath) + } + + if len(missing) == 0 && len(unexpected) == 0 { + return nil // Yay! + } + + return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing) +} + +func newTestManager() *manager { + const probePeriod = 1 + statusManager := status.NewManager(&testclient.Fake{}) + prober := FakeProber{Readiness: probe.Success} + return NewManager(probePeriod, statusManager, prober).(*manager) +} + +// Wait for the given workers to exit & clean up. +func waitForWorkerExit(m *manager, workerPaths []containerPath) error { + const interval = 100 * time.Millisecond + const timeout = 30 * time.Second + + for _, w := range workerPaths { + condition := func() (bool, error) { + _, exists := m.getReadinessProbe(w.podUID, w.containerName) + return !exists, nil + } + if exited, _ := condition(); exited { + continue // Already exited, no need to poll. + } + glog.Infof("Polling %v", w) + if err := wait.Poll(interval, timeout, condition); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 01f010095d9..14c6cf09ab6 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -41,7 +41,8 @@ const maxProbeRetries = 3 // Prober checks the healthiness of a container. type Prober interface { - Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) + ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) + ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) } // Prober helps to check the liveness/readiness of a container. @@ -51,55 +52,30 @@ type prober struct { tcp tcprobe.TCPProber runner kubecontainer.ContainerCommandRunner - readinessManager *kubecontainer.ReadinessManager - refManager *kubecontainer.RefManager - recorder record.EventRecorder + refManager *kubecontainer.RefManager + recorder record.EventRecorder } // NewProber creates a Prober, it takes a command runner and // several container info managers. func New( runner kubecontainer.ContainerCommandRunner, - readinessManager *kubecontainer.ReadinessManager, refManager *kubecontainer.RefManager, recorder record.EventRecorder) Prober { return &prober{ - exec: execprobe.New(), - http: httprobe.New(), - tcp: tcprobe.New(), - runner: runner, - - readinessManager: readinessManager, - refManager: refManager, - recorder: recorder, + exec: execprobe.New(), + http: httprobe.New(), + tcp: tcprobe.New(), + runner: runner, + refManager: refManager, + recorder: recorder, } } -// New prober for use in tests. -func NewTestProber( - exec execprobe.ExecProber, - readinessManager *kubecontainer.ReadinessManager, - refManager *kubecontainer.RefManager, - recorder record.EventRecorder) Prober { - - return &prober{ - exec: exec, - readinessManager: readinessManager, - refManager: refManager, - recorder: recorder, - } -} - -// Probe checks the liveness/readiness of the given container. -func (pb *prober) Probe(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { - pb.probeReadiness(pod, status, container, containerID, createdAt) - return pb.probeLiveness(pod, status, container, containerID, createdAt) -} - -// probeLiveness probes the liveness of a container. +// ProbeLiveness probes the liveness of a container. // If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success. -func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { +func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) { var live probe.Result var output string var err error @@ -137,24 +113,20 @@ func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container ap return probe.Success, nil } -// probeReadiness probes and sets the readiness of a container. -// If the initial delay on the readiness probe has not passed, we set readiness to false. -func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) { +// ProbeReadiness probes and sets the readiness of a container. +func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) { var ready probe.Result var output string var err error p := container.ReadinessProbe if p == nil { ready = probe.Success - } else if time.Now().Unix()-createdAt < p.InitialDelaySeconds { - ready = probe.Failure } else { ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) } ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) if err != nil || ready == probe.Failure { // Readiness failed in one way or another. - pb.readinessManager.SetReadiness(containerID, false) ref, ok := pb.refManager.GetRef(containerID) if !ok { glog.Warningf("No ref for pod '%v' - '%v'", containerID, container.Name) @@ -164,21 +136,17 @@ func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container a if ok { pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe errored: %v", err) } - return } else { // ready != probe.Success glog.V(1).Infof("Readiness probe for %q failed (%v): %s", ctrName, ready, output) if ok { pb.recorder.Eventf(ref, "Unhealthy", "Readiness probe failed: %s", output) } - return } - } - if ready == probe.Success { - pb.readinessManager.SetReadiness(containerID, true) + return probe.Failure, err } glog.V(3).Infof("Readiness probe for %q succeeded", ctrName) - + return ready, nil } // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index 01e3ffda49a..9742ecfde59 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -179,11 +179,9 @@ func TestGetTCPAddrParts(t *testing.T) { // PLEASE READ THE PROBE DOCS BEFORE CHANGING THIS TEST IF YOU ARE UNSURE HOW PROBES ARE SUPPOSED TO WORK: // (See https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/user-guide/pod-states.md#pod-conditions) func TestProbeContainer(t *testing.T) { - readinessManager := kubecontainer.NewReadinessManager() prober := &prober{ - readinessManager: readinessManager, - refManager: kubecontainer.NewRefManager(), - recorder: &record.FakeRecorder{}, + refManager: kubecontainer.NewRefManager(), + recorder: &record.FakeRecorder{}, } containerID := "foobar" createdAt := time.Now().Unix() @@ -191,29 +189,29 @@ func TestProbeContainer(t *testing.T) { tests := []struct { testContainer api.Container expectError bool - expectedResult probe.Result - expectedReadiness bool + expectedLiveness probe.Result + expectedReadiness probe.Result }{ // No probes. { testContainer: api.Container{}, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, // Only LivenessProbe. expectedReadiness should always be true here. { testContainer: api.Container{ LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, }, - expectedResult: probe.Unknown, - expectedReadiness: true, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -224,8 +222,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Failure, - expectedReadiness: true, + expectedLiveness: probe.Failure, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -236,8 +234,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -248,8 +246,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Unknown, - expectedReadiness: true, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -261,23 +259,16 @@ func TestProbeContainer(t *testing.T) { }, }, expectError: true, - expectedResult: probe.Unknown, - expectedReadiness: true, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Success, }, - // // Only ReadinessProbe. expectedResult should always be probe.Success here. + // // Only ReadinessProbe. expectedLiveness should always be probe.Success here. { testContainer: api.Container{ ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, }, - expectedResult: probe.Success, - expectedReadiness: false, - }, - { - testContainer: api.Container{ - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, - }, - expectedResult: probe.Success, - expectedReadiness: false, + expectedLiveness: probe.Success, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ @@ -288,8 +279,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -300,8 +291,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -312,8 +303,8 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, { testContainer: api.Container{ @@ -325,8 +316,8 @@ func TestProbeContainer(t *testing.T) { }, }, expectError: false, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, // Both LivenessProbe and ReadinessProbe. { @@ -334,32 +325,32 @@ func TestProbeContainer(t *testing.T) { LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, }, - expectedResult: probe.Success, - expectedReadiness: false, + expectedLiveness: probe.Success, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, }, - expectedResult: probe.Success, - expectedReadiness: false, + expectedLiveness: probe.Success, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, }, - expectedResult: probe.Unknown, - expectedReadiness: false, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, }, - expectedResult: probe.Unknown, - expectedReadiness: false, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ @@ -371,8 +362,8 @@ func TestProbeContainer(t *testing.T) { }, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, }, - expectedResult: probe.Unknown, - expectedReadiness: false, + expectedLiveness: probe.Unknown, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ @@ -384,8 +375,8 @@ func TestProbeContainer(t *testing.T) { }, ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, }, - expectedResult: probe.Failure, - expectedReadiness: false, + expectedLiveness: probe.Failure, + expectedReadiness: probe.Unknown, }, { testContainer: api.Container{ @@ -402,29 +393,37 @@ func TestProbeContainer(t *testing.T) { }, }, }, - expectedResult: probe.Success, - expectedReadiness: true, + expectedLiveness: probe.Success, + expectedReadiness: probe.Success, }, } for i, test := range tests { if test.expectError { - prober.exec = fakeExecProber{test.expectedResult, errors.New("exec error")} + prober.exec = fakeExecProber{test.expectedLiveness, errors.New("exec error")} } else { - prober.exec = fakeExecProber{test.expectedResult, nil} + prober.exec = fakeExecProber{test.expectedLiveness, nil} } - result, err := prober.Probe(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt) + + liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt) if test.expectError && err == nil { - t.Errorf("[%d] Expected error but no error was returned.", i) + t.Errorf("[%d] Expected liveness probe error but no error was returned.", i) } if !test.expectError && err != nil { - t.Errorf("[%d] Didn't expect error but got: %v", i, err) + t.Errorf("[%d] Didn't expect liveness probe error but got: %v", i, err) } - if test.expectedResult != result { - t.Errorf("[%d] Expected result to be %v but was %v", i, test.expectedResult, result) + if test.expectedLiveness != liveness { + t.Errorf("[%d] Expected liveness result to be %v but was %v", i, test.expectedLiveness, liveness) } - if test.expectedReadiness != readinessManager.GetReadiness(containerID) { - t.Errorf("[%d] Expected readiness to be %v but was %v", i, test.expectedReadiness, readinessManager.GetReadiness(containerID)) + + // TODO: Test readiness errors + prober.exec = fakeExecProber{test.expectedReadiness, nil} + readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) + if err != nil { + t.Errorf("[%d] Unexpected readiness probe error: %v", i, err) + } + if test.expectedReadiness != readiness { + t.Errorf("[%d] Expected readiness result to be %v but was %v", i, test.expectedReadiness, readiness) } } } diff --git a/pkg/kubelet/container/readiness_manager.go b/pkg/kubelet/prober/readiness_manager.go similarity index 63% rename from pkg/kubelet/container/readiness_manager.go rename to pkg/kubelet/prober/readiness_manager.go index ade99c39cb9..6e703fe5dae 100644 --- a/pkg/kubelet/container/readiness_manager.go +++ b/pkg/kubelet/prober/readiness_manager.go @@ -14,45 +14,45 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container +package prober import "sync" -// ReadinessManager maintains the readiness information(probe results) of +// readinessManager maintains the readiness information(probe results) of // containers over time to allow for implementation of health thresholds. // This manager is thread-safe, no locks are necessary for the caller. -type ReadinessManager struct { +type readinessManager struct { // guards states sync.RWMutex // TODO(yifan): To use strong type. states map[string]bool } -// NewReadinessManager creates ane returns a readiness manager with empty +// newReadinessManager creates ane returns a readiness manager with empty // contents. -func NewReadinessManager() *ReadinessManager { - return &ReadinessManager{states: make(map[string]bool)} +func newReadinessManager() *readinessManager { + return &readinessManager{states: make(map[string]bool)} } -// GetReadiness returns the readiness value for the container with the given ID. +// getReadiness returns the readiness value for the container with the given ID. // If the readiness value is found, returns it. // If the readiness is not found, returns false. -func (r *ReadinessManager) GetReadiness(id string) bool { +func (r *readinessManager) getReadiness(id string) (ready bool, found bool) { r.RLock() defer r.RUnlock() state, found := r.states[id] - return state && found + return state, found } -// SetReadiness sets the readiness value for the container with the given ID. -func (r *ReadinessManager) SetReadiness(id string, value bool) { +// setReadiness sets the readiness value for the container with the given ID. +func (r *readinessManager) setReadiness(id string, value bool) { r.Lock() defer r.Unlock() r.states[id] = value } -// RemoveReadiness clears the readiness value for the container with the given ID. -func (r *ReadinessManager) RemoveReadiness(id string) { +// removeReadiness clears the readiness value for the container with the given ID. +func (r *readinessManager) removeReadiness(id string) { r.Lock() defer r.Unlock() delete(r.states, id) diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go new file mode 100644 index 00000000000..c3d526aa04b --- /dev/null +++ b/pkg/kubelet/prober/worker.go @@ -0,0 +1,153 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubeutil "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// worker handles the periodic probing of its assigned container. Each worker has a go-routine +// associated with it which runs the probe loop until the container permanently terminates, or the +// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date +// container IDs. +// TODO: Handle liveness probing +type worker struct { + // Channel for stopping the probe, it should be closed to trigger a stop. + stop chan struct{} + + // The pod containing this probe (read-only) + pod *api.Pod + + // The container to probe (read-only) + container api.Container + + // Describes the probe configuration (read-only) + spec *api.Probe + + // The last known container ID for this worker. + containerID types.UID +} + +// Creates and starts a new probe worker. +func (m *manager) newWorker( + pod *api.Pod, + container api.Container) *worker { + + w := &worker{ + stop: make(chan struct{}), + pod: pod, + container: container, + spec: container.ReadinessProbe, + } + + // Start the worker thread. + go run(m, w) + + return w +} + +// run periodically probes the container. +func run(m *manager, w *worker) { + probeTicker := time.NewTicker(m.defaultProbePeriod) + + defer func() { + // Clean up. + probeTicker.Stop() + if w.containerID != "" { + m.readinessCache.removeReadiness(string(w.containerID)) + } + + m.removeReadinessProbe(w.pod.UID, w.container.Name) + }() + +probeLoop: + for doProbe(m, w) { + // Wait for next probe tick. + select { + case <-w.stop: + break probeLoop + case <-probeTicker.C: + // continue + } + } +} + +// doProbe probes the container once and records the result. +// Returns whether the worker should continue. +func doProbe(m *manager, w *worker) (keepGoing bool) { + defer util.HandleCrash(func(_ interface{}) { keepGoing = true }) + + status, ok := m.statusManager.GetPodStatus(w.pod.UID) + if !ok { + // Either the pod has not been created yet, or it was already deleted. + glog.V(3).Infof("No status for pod: %v", kubeutil.FormatPodName(w.pod)) + return true + } + + // Worker should terminate if pod is terminated. + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + glog.V(3).Infof("Pod %v %v, exiting probe worker", + kubeutil.FormatPodName(w.pod), status.Phase) + return false + } + + c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name) + if !ok { + // Either the container has not been created yet, or it was deleted. + glog.V(3).Infof("Non-existant container probed: %v - %v", + kubeutil.FormatPodName(w.pod), w.container.Name) + return true // Wait for more information. + } + + if w.containerID != types.UID(c.ContainerID) { + if w.containerID != "" { + m.readinessCache.removeReadiness(string(w.containerID)) + } + w.containerID = types.UID(kubecontainer.TrimRuntimePrefix(c.ContainerID)) + } + + if c.State.Running == nil { + glog.V(3).Infof("Non-running container probed: %v - %v", + kubeutil.FormatPodName(w.pod), w.container.Name) + m.readinessCache.setReadiness(string(w.containerID), false) + // Abort if the container will not be restarted. + return c.State.Terminated == nil || + w.pod.Spec.RestartPolicy != api.RestartPolicyNever + } + + if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { + // Readiness defaults to false during the initial delay. + m.readinessCache.setReadiness(string(w.containerID), false) + return true + } + + // TODO: Move error handling out of prober. + result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, string(w.containerID)) + if result != probe.Unknown { + m.readinessCache.setReadiness(string(w.containerID), result != probe.Failure) + } + + return true +} diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go new file mode 100644 index 00000000000..fc17575531f --- /dev/null +++ b/pkg/kubelet/prober/worker_test.go @@ -0,0 +1,240 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/probe" +) + +const ( + containerID = "cOnTaInEr_Id" + containerName = "cOnTaInEr_NaMe" + podUID = "pOd_UiD" +) + +func TestDoProbe(t *testing.T) { + m := newTestManager() + + // Test statuses. + runningStatus := getRunningStatus() + pendingStatus := getRunningStatus() + pendingStatus.ContainerStatuses[0].State.Running = nil + terminatedStatus := getRunningStatus() + terminatedStatus.ContainerStatuses[0].State.Running = nil + terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{ + StartedAt: unversioned.Now(), + } + otherStatus := getRunningStatus() + otherStatus.ContainerStatuses[0].Name = "otherContainer" + failedStatus := getRunningStatus() + failedStatus.Phase = api.PodFailed + + tests := []struct { + probe api.Probe + podStatus *api.PodStatus + + expectContinue bool + expectReadySet bool + expectedReadiness bool + }{ + { // No status. + expectContinue: true, + }, + { // Pod failed + podStatus: &failedStatus, + }, + { // No container status + podStatus: &otherStatus, + expectContinue: true, + }, + { // Container waiting + podStatus: &pendingStatus, + expectContinue: true, + expectReadySet: true, + }, + { // Container terminated + podStatus: &terminatedStatus, + expectReadySet: true, + }, + { // Probe successful. + podStatus: &runningStatus, + expectContinue: true, + expectReadySet: true, + expectedReadiness: true, + }, + { // Initial delay passed + podStatus: &runningStatus, + probe: api.Probe{ + InitialDelaySeconds: -100, + }, + expectContinue: true, + expectReadySet: true, + expectedReadiness: true, + }, + } + + for i, test := range tests { + w := newTestWorker(test.probe) + if test.podStatus != nil { + m.statusManager.SetPodStatus(w.pod, *test.podStatus) + } + if c := doProbe(m, w); c != test.expectContinue { + t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c) + } + ready, ok := m.readinessCache.getReadiness(containerID) + if ok != test.expectReadySet { + t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok) + } + if ready != test.expectedReadiness { + t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready) + } + + // Clean up. + m.statusManager.DeletePodStatus(podUID) + m.readinessCache.removeReadiness(containerID) + } +} + +func TestInitialDelay(t *testing.T) { + m := newTestManager() + w := newTestWorker(api.Probe{ + InitialDelaySeconds: 10, + }) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + + if !doProbe(m, w) { + t.Errorf("Expected to continue, but did not") + } + + ready, ok := m.readinessCache.getReadiness(containerID) + if !ok { + t.Errorf("Expected readiness to be false, but was not set") + } else if ready { + t.Errorf("Expected readiness to be false, but was true") + } + + // 100 seconds later... + laterStatus := getRunningStatus() + laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = + time.Now().Add(-100 * time.Second) + m.statusManager.SetPodStatus(w.pod, laterStatus) + + // Second call should succeed (already waited). + if !doProbe(m, w) { + t.Errorf("Expected to continue, but did not") + } + + ready, ok = m.readinessCache.getReadiness(containerID) + if !ok { + t.Errorf("Expected readiness to be true, but was not set") + } else if !ready { + t.Errorf("Expected readiness to be true, but was false") + } +} + +func TestCleanUp(t *testing.T) { + m := newTestManager() + pod := getTestPod(api.Probe{}) + m.statusManager.SetPodStatus(&pod, getRunningStatus()) + m.readinessCache.setReadiness(containerID, true) + w := m.newWorker(&pod, pod.Spec.Containers[0]) + m.readinessProbes[containerPath{podUID, containerName}] = w + + if ready, _ := m.readinessCache.getReadiness(containerID); !ready { + t.Fatal("Expected readiness to be true.") + } + + close(w.stop) + if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil { + t.Fatal(err) + } + + if _, ok := m.readinessCache.getReadiness(containerID); ok { + t.Error("Expected readiness to be cleared.") + } + if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok { + t.Error("Expected worker to be cleared.") + } +} + +func TestHandleCrash(t *testing.T) { + m := newTestManager() + m.prober = CrashingProber{} + w := newTestWorker(api.Probe{}) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + + // doProbe should recover from the crash, and keep going. + if !doProbe(m, w) { + t.Error("Expected to keep going, but terminated.") + } + if _, ok := m.readinessCache.getReadiness(containerID); ok { + t.Error("Expected readiness to be unchanged from crash.") + } +} + +func newTestWorker(probeSpec api.Probe) *worker { + pod := getTestPod(probeSpec) + return &worker{ + stop: make(chan struct{}), + pod: &pod, + container: pod.Spec.Containers[0], + spec: &probeSpec, + } +} + +func getRunningStatus() api.PodStatus { + containerStatus := api.ContainerStatus{ + Name: containerName, + ContainerID: containerID, + } + containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()} + podStatus := api.PodStatus{ + Phase: api.PodRunning, + ContainerStatuses: []api.ContainerStatus{containerStatus}, + } + return podStatus +} + +func getTestPod(probeSpec api.Probe) api.Pod { + container := api.Container{ + Name: containerName, + ReadinessProbe: &probeSpec, + } + pod := api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{container}, + RestartPolicy: api.RestartPolicyNever, + }, + } + pod.UID = podUID + return pod +} + +type CrashingProber struct{} + +func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string, _ int64) (probe.Result, error) { + panic("Intentional ProbeLiveness crash.") +} + +func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ string) (probe.Result, error) { + panic("Intentional ProbeReadiness crash.") +} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index df2755222aa..87b66aab910 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -94,7 +94,6 @@ type runtime struct { generator kubecontainer.RunContainerOptionsGenerator recorder record.EventRecorder prober prober.Prober - readinessManager *kubecontainer.ReadinessManager volumeGetter volumeGetter imagePuller kubecontainer.ImagePuller } @@ -113,7 +112,7 @@ func New(config *Config, generator kubecontainer.RunContainerOptionsGenerator, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, - readinessManager *kubecontainer.ReadinessManager, + prober prober.Prober, volumeGetter volumeGetter) (kubecontainer.Runtime, error) { systemdVersion, err := getSystemdVersion() @@ -151,10 +150,9 @@ func New(config *Config, containerRefManager: containerRefManager, generator: generator, recorder: recorder, - readinessManager: readinessManager, + prober: prober, volumeGetter: volumeGetter, } - rkt.prober = prober.New(rkt, readinessManager, containerRefManager, recorder) rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt) // Test the rkt version. @@ -996,7 +994,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus c := runningPod.FindContainerByName(container.Name) if c == nil { - if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, r.readinessManager) { + if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) { glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container) // TODO(yifan): Containers in one pod are fate-sharing at this moment, see: // https://github.com/appc/spec/issues/276. @@ -1016,7 +1014,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus break } - result, err := r.prober.Probe(pod, podStatus, container, string(c.ID), c.Created) + result, err := r.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created) // TODO(vmarmol): examine this logic. if err == nil && result != probe.Success { glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 5d36d708ddd..6be1c2aae2a 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -44,7 +44,6 @@ func TestRunOnce(t *testing.T) { nodeLister: testNodeLister{}, statusManager: status.NewManager(nil), containerRefManager: kubecontainer.NewRefManager(), - readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), diff --git a/test/e2e/container_probe.go b/test/e2e/container_probe.go index 4a0ebc1061b..d55aeb704d0 100644 --- a/test/e2e/container_probe.go +++ b/test/e2e/container_probe.go @@ -118,6 +118,9 @@ func makePodSpec(readinessProbe, livenessProbe *api.Probe) *api.Pod { Image: "gcr.io/google_containers/test-webserver", LivenessProbe: livenessProbe, ReadinessProbe: readinessProbe, + }, { + Name: "test-noprobe", + Image: "gcr.io/google_containers/pause", }, }, },