diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index d9eab5b73b4..afed48f834c 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -867,7 +867,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { ID: infraContainerID, Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42", }}) - dm.livenessManager.Set(kubecontainer.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) + dm.livenessManager.Set(kubecontainer.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, pod) runSyncPod(t, dm, fakeDocker, pod, nil, false) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index be75dd49ba4..9a98d4ac199 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2349,11 +2349,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler if update.Result == proberesults.Failure { // We should not use the pod from livenessManager, because it is never updated after // initialization. - // TODO(random-liu): This is just a quick fix. We should: - // * Just pass pod UID in probe updates to make this less confusing. - // * Maybe probe manager should rely on pod manager, or at least the pod in probe manager - // should be updated. - pod, ok := kl.podManager.GetPodByUID(update.Pod.UID) + pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index 2282d8e6285..9e46f0be3f7 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -33,7 +33,7 @@ import ( // Manager manages pod probing. It creates a probe "worker" for every container that specifies a // probe (AddPod). The worker periodically probes its assigned container and caches the results. The -// manager usse the cached probe results to set the appropriate Ready state in the PodStatus when +// manager use the cached probe results to set the appropriate Ready state in the PodStatus when // requested (UpdatePodStatus). Updating probe parameters is not currently supported. // TODO: Move liveness probing out of the runtime, to here. type Manager interface { @@ -234,5 +234,5 @@ func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success - m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready) + m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) } diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index ac64843f0ff..1cf6e5c9442 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -280,7 +280,8 @@ func TestUpdatePodStatus(t *testing.T) { } func TestUpdateReadiness(t *testing.T) { - testPod := getTestPod(readiness, api.Probe{}) + testPod := getTestPod() + setTestProbe(testPod, readiness, api.Probe{}) m := newTestManager() defer cleanup(t, m) @@ -297,9 +298,9 @@ func TestUpdateReadiness(t *testing.T) { exec.set(probe.Success, nil) m.prober.exec = &exec - m.statusManager.SetPodStatus(&testPod, getTestRunningStatus()) + m.statusManager.SetPodStatus(testPod, getTestRunningStatus()) - m.AddPod(&testPod) + m.AddPod(testPod) probePaths := []probeKey{{testPodUID, testContainerName, readiness}} if err := expectProbes(m, probePaths); err != nil { t.Error(err) diff --git a/pkg/kubelet/prober/results/results_manager.go b/pkg/kubelet/prober/results/results_manager.go index 3e4c3683512..9f9b1938d6f 100644 --- a/pkg/kubelet/prober/results/results_manager.go +++ b/pkg/kubelet/prober/results/results_manager.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" ) // Manager provides a probe results cache and channel of updates. @@ -61,7 +62,7 @@ func (r Result) String() string { type Update struct { ContainerID kubecontainer.ContainerID Result Result - Pod *api.Pod + PodUID types.UID } // Manager implementation. @@ -93,7 +94,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { if m.setInternal(id, result) { - m.updates <- Update{id, result, pod} + m.updates <- Update{id, result, pod.UID} } } diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go index d42aebb6e9a..89f1da9f092 100644 --- a/pkg/kubelet/prober/results/results_manager_test.go +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -35,7 +35,7 @@ func TestCacheOperations(t *testing.T) { _, found := m.Get(unsetID) assert.False(t, found, "unset result found") - m.Set(setID, Success, nil) + m.Set(setID, Success, &api.Pod{}) result, found := m.Get(setID) assert.True(t, result == Success, "set result") assert.True(t, found, "set result found") @@ -77,10 +77,10 @@ func TestUpdates(t *testing.T) { // New result should always push an update. m.Set(fooID, Success, pod) - expectUpdate(Update{fooID, Success, pod}, "new success") + expectUpdate(Update{fooID, Success, pod.UID}, "new success") m.Set(barID, Failure, pod) - expectUpdate(Update{barID, Failure, pod}, "new failure") + expectUpdate(Update{barID, Failure, pod.UID}, "new failure") // Unchanged results should not send an update. m.Set(fooID, Success, pod) @@ -91,8 +91,8 @@ func TestUpdates(t *testing.T) { // Changed results should send an update. m.Set(fooID, Failure, pod) - expectUpdate(Update{fooID, Failure, pod}, "changed foo") + expectUpdate(Update{fooID, Failure, pod.UID}, "changed foo") m.Set(barID, Success, pod) - expectUpdate(Update{barID, Success, pod}, "changed bar") + expectUpdate(Update{barID, Success, pod.UID}, "changed bar") } diff --git a/pkg/kubelet/prober/testing.go b/pkg/kubelet/prober/testing.go index 9ddb808903a..3bd9a1a7c29 100644 --- a/pkg/kubelet/prober/testing.go +++ b/pkg/kubelet/prober/testing.go @@ -52,11 +52,22 @@ func getTestRunningStatus() api.PodStatus { return podStatus } -func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { +func getTestPod() *api.Pod { container := api.Container{ Name: testContainerName, } + pod := api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{container}, + RestartPolicy: api.RestartPolicyNever, + }, + } + pod.Name = "testPod" + pod.UID = testPodUID + return &pod +} +func setTestProbe(pod *api.Pod, probeType probeType, probeSpec api.Probe) { // All tests rely on the fake exec prober. probeSpec.Handler = api.Handler{ Exec: &api.ExecAction{}, @@ -78,26 +89,20 @@ func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { switch probeType { case readiness: - container.ReadinessProbe = &probeSpec + pod.Spec.Containers[0].ReadinessProbe = &probeSpec case liveness: - container.LivenessProbe = &probeSpec + pod.Spec.Containers[0].LivenessProbe = &probeSpec } - pod := api.Pod{ - Spec: api.PodSpec{ - Containers: []api.Container{container}, - RestartPolicy: api.RestartPolicyNever, - }, - } - pod.Name = "testPod" - pod.UID = testPodUID - return pod } func newTestManager() *manager { refManager := kubecontainer.NewRefManager() refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings. + podManager := kubepod.NewBasicPodManager(nil) + // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. + podManager.AddPod(getTestPod()) m := NewManager( - status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil)), + status.NewManager(&fake.Clientset{}, podManager), results.NewManager(), nil, // runner refManager, @@ -109,8 +114,9 @@ func newTestManager() *manager { } func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { - pod := getTestPod(probeType, probeSpec) - return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) + pod := getTestPod() + setTestProbe(pod, probeType, probeSpec) + return newWorker(m, probeType, pod, pod.Spec.Containers[0]) } type fakeExecProber struct { diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index d991f6f619e..8dbba9b9e18 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -81,7 +81,7 @@ type Manager interface { // SetContainerReadiness updates the cached container status with the given readiness, and // triggers a status update. - SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) + SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) // TerminatePods resets the container status for the provided pods to terminated and triggers // a status update. This function may not enqueue all the provided pods, in which case it will @@ -150,10 +150,16 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { m.updateStatusInternal(pod, status) } -func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) { +func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + pod, ok := m.podManager.GetPodByUID(podUID) + if !ok { + glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID)) + return + } + oldStatus, found := m.podStatuses[pod.UID] if !found { glog.Warningf("Container readiness changed before pod has synced: %q - %q", diff --git a/pkg/kubelet/status/manager_test.go b/pkg/kubelet/status/manager_test.go index 2efd7e9c89b..41a9b8692b7 100644 --- a/pkg/kubelet/status/manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -578,9 +578,11 @@ func TestSetContainerReadiness(t *testing.T) { } m := newTestManager(&fake.Clientset{}) + // Add test pod because the container spec has been changed. + m.podManager.AddPod(pod) t.Log("Setting readiness before status should fail.") - m.SetContainerReadiness(pod, cID1, true) + m.SetContainerReadiness(pod.UID, cID1, true) verifyUpdates(t, m, 0) if status, ok := m.GetPodStatus(pod.UID); ok { t.Errorf("Unexpected PodStatus: %+v", status) @@ -593,25 +595,25 @@ func TestSetContainerReadiness(t *testing.T) { verifyReadiness("initial", &status, false, false, false) t.Log("Setting unchanged readiness should do nothing.") - m.SetContainerReadiness(pod, cID1, false) + m.SetContainerReadiness(pod.UID, cID1, false) verifyUpdates(t, m, 0) status = expectPodStatus(t, m, pod) verifyReadiness("unchanged", &status, false, false, false) t.Log("Setting container readiness should generate update but not pod readiness.") - m.SetContainerReadiness(pod, cID1, true) + m.SetContainerReadiness(pod.UID, cID1, true) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyReadiness("c1 ready", &status, true, false, false) t.Log("Setting both containers to ready should update pod readiness.") - m.SetContainerReadiness(pod, cID2, true) + m.SetContainerReadiness(pod.UID, cID2, true) verifyUpdates(t, m, 1) status = expectPodStatus(t, m, pod) verifyReadiness("all ready", &status, true, true, true) t.Log("Setting non-existant container readiness should fail.") - m.SetContainerReadiness(pod, kubecontainer.ContainerID{"test", "foo"}, true) + m.SetContainerReadiness(pod.UID, kubecontainer.ContainerID{"test", "foo"}, true) verifyUpdates(t, m, 0) status = expectPodStatus(t, m, pod) verifyReadiness("ignore non-existant", &status, true, true, true)