Change pod in probe update to pod uid.

This commit is contained in:
Lantao Liu 2016-02-12 05:02:31 +00:00
parent 1bbd3de599
commit 77b6f14f86
9 changed files with 52 additions and 40 deletions

View File

@ -867,7 +867,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
ID: infraContainerID, ID: infraContainerID,
Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42", Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42",
}}) }})
dm.livenessManager.Set(kubecontainer.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) dm.livenessManager.Set(kubecontainer.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, pod)
runSyncPod(t, dm, fakeDocker, pod, nil, false) runSyncPod(t, dm, fakeDocker, pod, nil, false)

View File

@ -2371,11 +2371,7 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
if update.Result == proberesults.Failure { if update.Result == proberesults.Failure {
// We should not use the pod from livenessManager, because it is never updated after // We should not use the pod from livenessManager, because it is never updated after
// initialization. // initialization.
// TODO(random-liu): This is just a quick fix. We should: pod, ok := kl.podManager.GetPodByUID(update.PodUID)
// * Just pass pod UID in probe updates to make this less confusing.
// * Maybe probe manager should rely on pod manager, or at least the pod in probe manager
// should be updated.
pod, ok := kl.podManager.GetPodByUID(update.Pod.UID)
if !ok { if !ok {
// If the pod no longer exists, ignore the update. // If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)

View File

@ -33,7 +33,7 @@ import (
// Manager manages pod probing. It creates a probe "worker" for every container that specifies a // Manager manages pod probing. It creates a probe "worker" for every container that specifies a
// probe (AddPod). The worker periodically probes its assigned container and caches the results. The // probe (AddPod). The worker periodically probes its assigned container and caches the results. The
// manager usse the cached probe results to set the appropriate Ready state in the PodStatus when // manager use the cached probe results to set the appropriate Ready state in the PodStatus when
// requested (UpdatePodStatus). Updating probe parameters is not currently supported. // requested (UpdatePodStatus). Updating probe parameters is not currently supported.
// TODO: Move liveness probing out of the runtime, to here. // TODO: Move liveness probing out of the runtime, to here.
type Manager interface { type Manager interface {
@ -234,5 +234,5 @@ func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates() update := <-m.readinessManager.Updates()
ready := update.Result == results.Success ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready) m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
} }

View File

@ -280,7 +280,8 @@ func TestUpdatePodStatus(t *testing.T) {
} }
func TestUpdateReadiness(t *testing.T) { func TestUpdateReadiness(t *testing.T) {
testPod := getTestPod(readiness, api.Probe{}) testPod := getTestPod()
setTestProbe(testPod, readiness, api.Probe{})
m := newTestManager() m := newTestManager()
defer cleanup(t, m) defer cleanup(t, m)
@ -297,9 +298,9 @@ func TestUpdateReadiness(t *testing.T) {
exec.set(probe.Success, nil) exec.set(probe.Success, nil)
m.prober.exec = &exec m.prober.exec = &exec
m.statusManager.SetPodStatus(&testPod, getTestRunningStatus()) m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
m.AddPod(&testPod) m.AddPod(testPod)
probePaths := []probeKey{{testPodUID, testContainerName, readiness}} probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
if err := expectProbes(m, probePaths); err != nil { if err := expectProbes(m, probePaths); err != nil {
t.Error(err) t.Error(err)

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
) )
// Manager provides a probe results cache and channel of updates. // Manager provides a probe results cache and channel of updates.
@ -61,7 +62,7 @@ func (r Result) String() string {
type Update struct { type Update struct {
ContainerID kubecontainer.ContainerID ContainerID kubecontainer.ContainerID
Result Result Result Result
Pod *api.Pod PodUID types.UID
} }
// Manager implementation. // Manager implementation.
@ -93,7 +94,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) {
if m.setInternal(id, result) { if m.setInternal(id, result) {
m.updates <- Update{id, result, pod} m.updates <- Update{id, result, pod.UID}
} }
} }

View File

@ -35,7 +35,7 @@ func TestCacheOperations(t *testing.T) {
_, found := m.Get(unsetID) _, found := m.Get(unsetID)
assert.False(t, found, "unset result found") assert.False(t, found, "unset result found")
m.Set(setID, Success, nil) m.Set(setID, Success, &api.Pod{})
result, found := m.Get(setID) result, found := m.Get(setID)
assert.True(t, result == Success, "set result") assert.True(t, result == Success, "set result")
assert.True(t, found, "set result found") assert.True(t, found, "set result found")
@ -77,10 +77,10 @@ func TestUpdates(t *testing.T) {
// New result should always push an update. // New result should always push an update.
m.Set(fooID, Success, pod) m.Set(fooID, Success, pod)
expectUpdate(Update{fooID, Success, pod}, "new success") expectUpdate(Update{fooID, Success, pod.UID}, "new success")
m.Set(barID, Failure, pod) m.Set(barID, Failure, pod)
expectUpdate(Update{barID, Failure, pod}, "new failure") expectUpdate(Update{barID, Failure, pod.UID}, "new failure")
// Unchanged results should not send an update. // Unchanged results should not send an update.
m.Set(fooID, Success, pod) m.Set(fooID, Success, pod)
@ -91,8 +91,8 @@ func TestUpdates(t *testing.T) {
// Changed results should send an update. // Changed results should send an update.
m.Set(fooID, Failure, pod) m.Set(fooID, Failure, pod)
expectUpdate(Update{fooID, Failure, pod}, "changed foo") expectUpdate(Update{fooID, Failure, pod.UID}, "changed foo")
m.Set(barID, Success, pod) m.Set(barID, Success, pod)
expectUpdate(Update{barID, Success, pod}, "changed bar") expectUpdate(Update{barID, Success, pod.UID}, "changed bar")
} }

View File

@ -52,11 +52,22 @@ func getTestRunningStatus() api.PodStatus {
return podStatus return podStatus
} }
func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { func getTestPod() *api.Pod {
container := api.Container{ container := api.Container{
Name: testContainerName, Name: testContainerName,
} }
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.Name = "testPod"
pod.UID = testPodUID
return &pod
}
func setTestProbe(pod *api.Pod, probeType probeType, probeSpec api.Probe) {
// All tests rely on the fake exec prober. // All tests rely on the fake exec prober.
probeSpec.Handler = api.Handler{ probeSpec.Handler = api.Handler{
Exec: &api.ExecAction{}, Exec: &api.ExecAction{},
@ -78,26 +89,20 @@ func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod {
switch probeType { switch probeType {
case readiness: case readiness:
container.ReadinessProbe = &probeSpec pod.Spec.Containers[0].ReadinessProbe = &probeSpec
case liveness: case liveness:
container.LivenessProbe = &probeSpec pod.Spec.Containers[0].LivenessProbe = &probeSpec
} }
pod := api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{container},
RestartPolicy: api.RestartPolicyNever,
},
}
pod.Name = "testPod"
pod.UID = testPodUID
return pod
} }
func newTestManager() *manager { func newTestManager() *manager {
refManager := kubecontainer.NewRefManager() refManager := kubecontainer.NewRefManager()
refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings. refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings.
podManager := kubepod.NewBasicPodManager(nil)
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
m := NewManager( m := NewManager(
status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil)), status.NewManager(&fake.Clientset{}, podManager),
results.NewManager(), results.NewManager(),
nil, // runner nil, // runner
refManager, refManager,
@ -109,8 +114,9 @@ func newTestManager() *manager {
} }
func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker {
pod := getTestPod(probeType, probeSpec) pod := getTestPod()
return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) setTestProbe(pod, probeType, probeSpec)
return newWorker(m, probeType, pod, pod.Spec.Containers[0])
} }
type fakeExecProber struct { type fakeExecProber struct {

View File

@ -81,7 +81,7 @@ type Manager interface {
// SetContainerReadiness updates the cached container status with the given readiness, and // SetContainerReadiness updates the cached container status with the given readiness, and
// triggers a status update. // triggers a status update.
SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
// TerminatePods resets the container status for the provided pods to terminated and triggers // TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will // a status update. This function may not enqueue all the provided pods, in which case it will
@ -150,10 +150,16 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.updateStatusInternal(pod, status) m.updateStatusInternal(pod, status)
} }
func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) { func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock() m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock() defer m.podStatusesLock.Unlock()
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
return
}
oldStatus, found := m.podStatuses[pod.UID] oldStatus, found := m.podStatuses[pod.UID]
if !found { if !found {
glog.Warningf("Container readiness changed before pod has synced: %q - %q", glog.Warningf("Container readiness changed before pod has synced: %q - %q",

View File

@ -578,9 +578,11 @@ func TestSetContainerReadiness(t *testing.T) {
} }
m := newTestManager(&fake.Clientset{}) m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
m.podManager.AddPod(pod)
t.Log("Setting readiness before status should fail.") t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(pod, cID1, true) m.SetContainerReadiness(pod.UID, cID1, true)
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
if status, ok := m.GetPodStatus(pod.UID); ok { if status, ok := m.GetPodStatus(pod.UID); ok {
t.Errorf("Unexpected PodStatus: %+v", status) t.Errorf("Unexpected PodStatus: %+v", status)
@ -593,25 +595,25 @@ func TestSetContainerReadiness(t *testing.T) {
verifyReadiness("initial", &status, false, false, false) verifyReadiness("initial", &status, false, false, false)
t.Log("Setting unchanged readiness should do nothing.") t.Log("Setting unchanged readiness should do nothing.")
m.SetContainerReadiness(pod, cID1, false) m.SetContainerReadiness(pod.UID, cID1, false)
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod) status = expectPodStatus(t, m, pod)
verifyReadiness("unchanged", &status, false, false, false) verifyReadiness("unchanged", &status, false, false, false)
t.Log("Setting container readiness should generate update but not pod readiness.") t.Log("Setting container readiness should generate update but not pod readiness.")
m.SetContainerReadiness(pod, cID1, true) m.SetContainerReadiness(pod.UID, cID1, true)
verifyUpdates(t, m, 1) verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod) status = expectPodStatus(t, m, pod)
verifyReadiness("c1 ready", &status, true, false, false) verifyReadiness("c1 ready", &status, true, false, false)
t.Log("Setting both containers to ready should update pod readiness.") t.Log("Setting both containers to ready should update pod readiness.")
m.SetContainerReadiness(pod, cID2, true) m.SetContainerReadiness(pod.UID, cID2, true)
verifyUpdates(t, m, 1) verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod) status = expectPodStatus(t, m, pod)
verifyReadiness("all ready", &status, true, true, true) verifyReadiness("all ready", &status, true, true, true)
t.Log("Setting non-existant container readiness should fail.") t.Log("Setting non-existant container readiness should fail.")
m.SetContainerReadiness(pod, kubecontainer.ContainerID{"test", "foo"}, true) m.SetContainerReadiness(pod.UID, kubecontainer.ContainerID{"test", "foo"}, true)
verifyUpdates(t, m, 0) verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod) status = expectPodStatus(t, m, pod)
verifyReadiness("ignore non-existant", &status, true, true, true) verifyReadiness("ignore non-existant", &status, true, true, true)