diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4cd74d0c475..2c7a72d9988 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -322,8 +322,7 @@ func NewMainKubelet( procFs := procfs.NewProcFs() imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff) - readinessManager := proberesults.NewManager() - klet.livenessManager = proberesults.NewManagerWithUpdates() + klet.livenessManager = proberesults.NewManager() // Initialize the runtime. switch containerRuntime { @@ -419,7 +418,6 @@ func NewMainKubelet( klet.probeManager = prober.NewManager( klet.statusManager, - readinessManager, klet.livenessManager, klet.runner, containerRefManager, diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index bfe90874e74..98cd088ee23 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" kubeutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" ) @@ -74,19 +75,24 @@ type manager struct { func NewManager( statusManager status.Manager, - readinessManager results.Manager, livenessManager results.Manager, runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, recorder record.EventRecorder) Manager { prober := newProber(runner, refManager, recorder) - return &manager{ + readinessManager := results.NewManager() + m := &manager{ statusManager: statusManager, prober: prober, readinessManager: readinessManager, livenessManager: livenessManager, workers: make(map[probeKey]*worker), } + + // Start syncing readiness. + go util.Forever(m.updateReadiness, 0) + + return m } // Key uniquely identifying container probes @@ -211,3 +217,10 @@ func (m *manager) removeWorker(podUID types.UID, containerName string, probeType defer m.workerLock.Unlock() delete(m.workers, probeKey{podUID, containerName, probeType}) } + +func (m *manager) updateReadiness() { + update := <-m.readinessManager.Updates() + + ready := update.Result == results.Success + m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready) +} diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index ce0d3cee573..de9ccd1cc78 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -23,17 +23,17 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/client/unversioned/testclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober/results" - "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) +func init() { + util.ReallyCrash = true +} + var defaultProbe *api.Probe = &api.Probe{ Handler: api.Handler{ Exec: &api.ExecAction{}, @@ -172,7 +172,6 @@ func TestCleanupPods(t *testing.T) { } func TestUpdatePodStatus(t *testing.T) { - const podUID = "pod_uid" unprobed := api.ContainerStatus{ Name: "unprobed_container", ContainerID: "test://unprobed_container_id", @@ -218,27 +217,27 @@ func TestUpdatePodStatus(t *testing.T) { m := newTestManager() // Setup probe "workers" and cached results. m.workers = map[probeKey]*worker{ - probeKey{podUID, unprobed.Name, liveness}: {}, - probeKey{podUID, probedReady.Name, readiness}: {}, - probeKey{podUID, probedPending.Name, readiness}: {}, - probeKey{podUID, probedUnready.Name, readiness}: {}, - probeKey{podUID, terminated.Name, readiness}: {}, + probeKey{testPodUID, unprobed.Name, liveness}: {}, + probeKey{testPodUID, probedReady.Name, readiness}: {}, + probeKey{testPodUID, probedPending.Name, readiness}: {}, + probeKey{testPodUID, probedUnready.Name, readiness}: {}, + probeKey{testPodUID, terminated.Name, readiness}: {}, } - m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil) - m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil) - m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &api.Pod{}) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &api.Pod{}) + m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &api.Pod{}) - m.UpdatePodStatus(podUID, &podStatus) + m.UpdatePodStatus(testPodUID, &podStatus) expectedReadiness := map[probeKey]bool{ - probeKey{podUID, unprobed.Name, readiness}: true, - probeKey{podUID, probedReady.Name, readiness}: true, - probeKey{podUID, probedPending.Name, readiness}: false, - probeKey{podUID, probedUnready.Name, readiness}: false, - probeKey{podUID, terminated.Name, readiness}: false, + probeKey{testPodUID, unprobed.Name, readiness}: true, + probeKey{testPodUID, probedReady.Name, readiness}: true, + probeKey{testPodUID, probedPending.Name, readiness}: false, + probeKey{testPodUID, probedUnready.Name, readiness}: false, + probeKey{testPodUID, terminated.Name, readiness}: false, } for _, c := range podStatus.ContainerStatuses { - expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}] + expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}] if !ok { t.Fatalf("Missing expectation for test case: %v", c.Name) } @@ -249,6 +248,31 @@ func TestUpdatePodStatus(t *testing.T) { } } +func TestUpdateReadiness(t *testing.T) { + testPod := getTestPod(readiness, api.Probe{}) + m := newTestManager() + m.statusManager.SetPodStatus(&testPod, getTestRunningStatus()) + + m.AddPod(&testPod) + probePaths := []probeKey{{testPodUID, testContainerName, readiness}} + if err := expectProbes(m, probePaths); err != nil { + t.Error(err) + } + + // Wait for ready status. + if err := waitForReadyStatus(m, true); err != nil { + t.Error(err) + } + + // Prober fails. + m.prober.exec = fakeExecProber{probe.Failure, nil} + + // Wait for failed status. + if err := waitForReadyStatus(m, false); err != nil { + t.Error(err) + } +} + func expectProbes(m *manager, expectedProbes []probeKey) error { m.workerLock.RLock() defer m.workerLock.RUnlock() @@ -275,24 +299,10 @@ outer: return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing) } -func newTestManager() *manager { - m := NewManager( - status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)), - results.NewManager(), - results.NewManager(), - nil, // runner - kubecontainer.NewRefManager(), - &record.FakeRecorder{}, - ).(*manager) - // Don't actually execute probes. - m.prober.exec = fakeExecProber{probe.Success, nil} - return m -} +const interval = 100 * time.Millisecond // Wait for the given workers to exit & clean up. func waitForWorkerExit(m *manager, workerPaths []probeKey) error { - const interval = 100 * time.Millisecond - for _, w := range workerPaths { condition := func() (bool, error) { _, exists := m.getWorker(w.podUID, w.containerName, w.probeType) @@ -309,3 +319,27 @@ func waitForWorkerExit(m *manager, workerPaths []probeKey) error { return nil } + +// Wait for the given workers to exit & clean up. +func waitForReadyStatus(m *manager, ready bool) error { + condition := func() (bool, error) { + status, ok := m.statusManager.GetPodStatus(testPodUID) + if !ok { + return false, fmt.Errorf("status not found: %q", testPodUID) + } + if len(status.ContainerStatuses) != 1 { + return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses)) + } + if status.ContainerStatuses[0].ContainerID != testContainerID.String() { + return false, fmt.Errorf("expected container %q, found %q", + testContainerID, status.ContainerStatuses[0].ContainerID) + } + return status.ContainerStatuses[0].Ready == ready, nil + } + glog.Infof("Polling for ready state %v", ready) + if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { + return err + } + + return nil +} diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index 4d16d6bfef3..0c2d99aace9 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" - "k8s.io/kubernetes/pkg/util/exec" ) func TestFormatURL(t *testing.T) { @@ -246,12 +245,3 @@ func TestProbe(t *testing.T) { } } } - -type fakeExecProber struct { - result probe.Result - err error -} - -func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) { - return p.result, "", p.err -} diff --git a/pkg/kubelet/prober/results/results_manager.go b/pkg/kubelet/prober/results/results_manager.go index eb55f71e85d..3e4c3683512 100644 --- a/pkg/kubelet/prober/results/results_manager.go +++ b/pkg/kubelet/prober/results/results_manager.go @@ -70,7 +70,7 @@ type manager struct { sync.RWMutex // map of container ID -> probe Result cache map[kubecontainer.ContainerID]Result - // channel of updates (may be nil) + // channel of updates updates chan Update } @@ -78,15 +78,10 @@ var _ Manager = &manager{} // NewManager creates ane returns an empty results manager. func NewManager() Manager { - m := &manager{cache: make(map[kubecontainer.ContainerID]Result)} - return m -} - -// NewManager creates ane returns an empty results manager. -func NewManagerWithUpdates() Manager { - m := NewManager().(*manager) - m.updates = make(chan Update, 20) - return m + return &manager{ + cache: make(map[kubecontainer.ContainerID]Result), + updates: make(chan Update, 20), + } } func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { @@ -98,7 +93,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { if m.setInternal(id, result) { - m.pushUpdate(Update{id, result, pod}) + m.updates <- Update{id, result, pod} } } @@ -123,10 +118,3 @@ func (m *manager) Remove(id kubecontainer.ContainerID) { func (m *manager) Updates() <-chan Update { return m.updates } - -// pushUpdates sends an update on the updates channel if it is initialized. -func (m *manager) pushUpdate(update Update) { - if m.updates != nil { - m.updates <- update - } -} diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go index 24b131958b7..64e815b8fb4 100644 --- a/pkg/kubelet/prober/results/results_manager_test.go +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -46,7 +46,7 @@ func TestCacheOperations(t *testing.T) { } func TestUpdates(t *testing.T) { - m := NewManagerWithUpdates() + m := NewManager() pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}} fooID := kubecontainer.ContainerID{"test", "foo"} diff --git a/pkg/kubelet/prober/testing.go b/pkg/kubelet/prober/testing.go new file mode 100644 index 00000000000..fbdc77ffbda --- /dev/null +++ b/pkg/kubelet/prober/testing.go @@ -0,0 +1,122 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubepod "k8s.io/kubernetes/pkg/kubelet/pod" + "k8s.io/kubernetes/pkg/kubelet/prober/results" + "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util/exec" +) + +const ( + testContainerName = "cOnTaInEr_NaMe" + testPodUID = "pOd_UiD" +) + +var testContainerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"} + +func getTestRunningStatus() api.PodStatus { + containerStatus := api.ContainerStatus{ + Name: testContainerName, + ContainerID: testContainerID.String(), + } + containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()} + podStatus := api.PodStatus{ + Phase: api.PodRunning, + ContainerStatuses: []api.ContainerStatus{containerStatus}, + } + return podStatus +} + +func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { + container := api.Container{ + Name: testContainerName, + } + + // All tests rely on the fake exec prober. + probeSpec.Handler = api.Handler{ + Exec: &api.ExecAction{}, + } + + // Apply test defaults, overwridden for test speed. + defaults := map[string]int64{ + "TimeoutSeconds": 1, + "PeriodSeconds": 1, + "SuccessThreshold": 1, + "FailureThreshold": 1, + } + for field, value := range defaults { + f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field) + if f.Int() == 0 { + f.SetInt(value) + } + } + + switch probeType { + case readiness: + container.ReadinessProbe = &probeSpec + case liveness: + container.LivenessProbe = &probeSpec + } + pod := api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{container}, + RestartPolicy: api.RestartPolicyNever, + }, + } + pod.Name = "testPod" + pod.UID = testPodUID + return pod +} + +func newTestManager() *manager { + refManager := kubecontainer.NewRefManager() + refManager.SetRef(testContainerID, &api.ObjectReference{}) // Suppress prober warnings. + m := NewManager( + status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)), + results.NewManager(), + nil, // runner + refManager, + &record.FakeRecorder{}, + ).(*manager) + // Don't actually execute probes. + m.prober.exec = fakeExecProber{probe.Success, nil} + return m +} + +func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { + pod := getTestPod(probeType, probeSpec) + return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) +} + +type fakeExecProber struct { + result probe.Result + err error +} + +func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) { + return p.result, "", p.err +} diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 88556fe068d..cd6b4236239 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -18,7 +18,6 @@ package prober import ( "fmt" - "reflect" "testing" "time" @@ -36,28 +35,25 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) -const ( - containerName = "cOnTaInEr_NaMe" - podUID = "pOd_UiD" -) - -var containerID = kubecontainer.ContainerID{"test", "cOnTaInEr_Id"} +func init() { + util.ReallyCrash = true +} func TestDoProbe(t *testing.T) { m := newTestManager() // Test statuses. - runningStatus := getRunningStatus() - pendingStatus := getRunningStatus() + runningStatus := getTestRunningStatus() + pendingStatus := getTestRunningStatus() pendingStatus.ContainerStatuses[0].State.Running = nil - terminatedStatus := getRunningStatus() + terminatedStatus := getTestRunningStatus() terminatedStatus.ContainerStatuses[0].State.Running = nil terminatedStatus.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{ StartedAt: unversioned.Now(), } - otherStatus := getRunningStatus() + otherStatus := getTestRunningStatus() otherStatus.ContainerStatuses[0].Name = "otherContainer" - failedStatus := getRunningStatus() + failedStatus := getTestRunningStatus() failedStatus.Phase = api.PodFailed tests := []struct { @@ -112,7 +108,7 @@ func TestDoProbe(t *testing.T) { if c := w.doProbe(); c != test.expectContinue { t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c) } - result, ok := resultsManager(m, probeType).Get(containerID) + result, ok := resultsManager(m, probeType).Get(testContainerID) if ok != test.expectSet { t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok) } @@ -122,7 +118,7 @@ func TestDoProbe(t *testing.T) { // Clean up. m.statusManager = status.NewManager(&testclient.Fake{}, kubepod.NewBasicPodManager(nil)) - resultsManager(m, probeType).Remove(containerID) + resultsManager(m, probeType).Remove(testContainerID) } } } @@ -134,13 +130,13 @@ func TestInitialDelay(t *testing.T) { w := newTestWorker(m, probeType, api.Probe{ InitialDelaySeconds: 10, }) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) expectContinue(t, w, w.doProbe(), "during initial delay") expectResult(t, w, results.Result(probeType == liveness), "during initial delay") // 100 seconds later... - laterStatus := getRunningStatus() + laterStatus := getTestRunningStatus() laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = time.Now().Add(-100 * time.Second) m.statusManager.SetPodStatus(w.pod, laterStatus) @@ -153,8 +149,8 @@ func TestInitialDelay(t *testing.T) { func TestFailureThreshold(t *testing.T) { m := newTestManager() - w := newTestWorker(m, readiness, api.Probe{}) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 1, FailureThreshold: 3}) + m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) for i := 0; i < 2; i++ { // First probe should succeed. @@ -171,7 +167,7 @@ func TestFailureThreshold(t *testing.T) { // Next 2 probes should still be "success". for j := 0; j < 2; j++ { - msg := fmt.Sprintf("%d failure (%d)", j+1, i) + msg := fmt.Sprintf("%d failing (%d)", j+1, i) expectContinue(t, w, w.doProbe(), msg) expectResult(t, w, results.Success, msg) } @@ -188,10 +184,10 @@ func TestFailureThreshold(t *testing.T) { func TestSuccessThreshold(t *testing.T) { m := newTestManager() w := newTestWorker(m, readiness, api.Probe{SuccessThreshold: 3, FailureThreshold: 1}) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) // Start out failure. - w.resultsManager.Set(containerID, results.Failure, nil) + w.resultsManager.Set(testContainerID, results.Failure, &api.Pod{}) for i := 0; i < 2; i++ { // Probe defaults to Failure. @@ -223,15 +219,15 @@ func TestCleanUp(t *testing.T) { m := newTestManager() for _, probeType := range [...]probeType{liveness, readiness} { - key := probeKey{podUID, containerName, probeType} + key := probeKey{testPodUID, testContainerName, probeType} w := newTestWorker(m, probeType, api.Probe{}) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) go w.run() m.workers[key] = w // Wait for worker to run. condition := func() (bool, error) { - ready, _ := resultsManager(m, probeType).Get(containerID) + ready, _ := resultsManager(m, probeType).Get(testContainerID) return ready == results.Success, nil } if ready, _ := condition(); !ready { @@ -245,7 +241,7 @@ func TestCleanUp(t *testing.T) { t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) } - if _, ok := resultsManager(m, probeType).Get(containerID); ok { + if _, ok := resultsManager(m, probeType).Get(testContainerID); ok { t.Errorf("[%s] Expected result to be cleared.", probeType) } if _, ok := m.workers[key]; ok { @@ -255,9 +251,11 @@ func TestCleanUp(t *testing.T) { } func TestHandleCrash(t *testing.T) { + util.ReallyCrash = false // Test that we *don't* really crash. + m := newTestManager() w := newTestWorker(m, readiness, api.Probe{}) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + m.statusManager.SetPodStatus(w.pod, getTestRunningStatus()) expectContinue(t, w, w.doProbe(), "Initial successful probe.") expectResult(t, w, results.Success, "Initial successful probe.") @@ -274,64 +272,8 @@ func TestHandleCrash(t *testing.T) { expectResult(t, w, results.Success, "Crashing probe unchanged.") } -func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { - // All tests rely on the fake exec prober. - probeSpec.Handler = api.Handler{ - Exec: &api.ExecAction{}, - } - // Apply default values. - defaults := map[string]int64{ - "TimeoutSeconds": 1, - "PeriodSeconds": 10, - "SuccessThreshold": 1, - "FailureThreshold": 3, - } - for field, value := range defaults { - f := reflect.ValueOf(&probeSpec).Elem().FieldByName(field) - if f.Int() == 0 { - f.SetInt(value) - } - } - - pod := getTestPod(probeType, probeSpec) - return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) -} - -func getRunningStatus() api.PodStatus { - containerStatus := api.ContainerStatus{ - Name: containerName, - ContainerID: containerID.String(), - } - containerStatus.State.Running = &api.ContainerStateRunning{unversioned.Now()} - podStatus := api.PodStatus{ - Phase: api.PodRunning, - ContainerStatuses: []api.ContainerStatus{containerStatus}, - } - return podStatus -} - -func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { - container := api.Container{ - Name: containerName, - } - switch probeType { - case readiness: - container.ReadinessProbe = &probeSpec - case liveness: - container.LivenessProbe = &probeSpec - } - pod := api.Pod{ - Spec: api.PodSpec{ - Containers: []api.Container{container}, - RestartPolicy: api.RestartPolicyNever, - }, - } - pod.UID = podUID - return pod -} - func expectResult(t *testing.T, w *worker, expectedResult results.Result, msg string) { - result, ok := resultsManager(w.probeManager, w.probeType).Get(containerID) + result, ok := resultsManager(w.probeManager, w.probeType).Get(testContainerID) if !ok { t.Errorf("[%s - %s] Expected result to be set, but was not set", w.probeType, msg) } else if result != expectedResult { diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 210a3113ff3..182508d5b16 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" @@ -77,6 +78,10 @@ type Manager interface { // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. SetPodStatus(pod *api.Pod, status api.PodStatus) + // SetContainerReadiness updates the cached container status with the given readiness, and + // triggers a status update. + SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) + // TerminatePods resets the container status for the provided pods to terminated and triggers // a status update. This function may not enqueue all the provided pods, in which case it will // return false @@ -171,19 +176,50 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { status.StartTime = &now } - newStatus := m.updateStatusInternal(pod, status) - if newStatus != nil { - select { - case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}: - default: - // Let the periodic syncBatch handle the update if the channel is full. - // We can't block, since we hold the mutex lock. + m.updateStatusInternal(pod, status) +} + +func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + + oldStatus, found := m.podStatuses[pod.UID] + if !found { + glog.Warningf("Container readiness changed before pod has synced: %q - %q", + kubeletutil.FormatPodName(pod), containerID.String()) + return + } + status := oldStatus.status + + // Find the container to update. + containerIndex := -1 + for i, c := range status.ContainerStatuses { + if c.ContainerID == containerID.String() { + containerIndex = i + break } } + if containerIndex == -1 { + glog.Warningf("Container readiness changed for unknown container: %q - %q", + kubeletutil.FormatPodName(pod), containerID.String()) + return + } + + if status.ContainerStatuses[containerIndex].Ready == ready { + glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready, + kubeletutil.FormatPodName(pod), containerID.String()) + return + } + + // Make sure we're not updating the cached version. + status.ContainerStatuses = make([]api.ContainerStatus, len(status.ContainerStatuses)) + copy(status.ContainerStatuses, oldStatus.status.ContainerStatuses) + status.ContainerStatuses[containerIndex].Ready = ready + m.updateStatusInternal(pod, status) } func (m *manager) TerminatePods(pods []*api.Pod) bool { - sent := true + allSent := true m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() for _, pod := range pods { @@ -192,39 +228,41 @@ func (m *manager) TerminatePods(pods []*api.Pod) bool { Terminated: &api.ContainerStateTerminated{}, } } - newStatus := m.updateStatusInternal(pod, pod.Status) - if newStatus != nil { - select { - case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}: - default: - sent = false - glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod)) - } - } else { - sent = false + if sent := m.updateStatusInternal(pod, pod.Status); !sent { + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod)) + allSent = false } } - return sent + return allSent } -// updateStatusInternal updates the internal status cache, and returns a versioned status if an -// update is necessary. This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *versionedPodStatus { +// updateStatusInternal updates the internal status cache, and queues an update to the api server if +// necessary. Returns whether an update was triggered. +// This method IS NOT THREAD SAFE and must be called from a locked function. +func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool { // The intent here is to prevent concurrent updates to a pod's status from // clobbering each other so the phase of a pod progresses monotonically. oldStatus, found := m.podStatuses[pod.UID] - if !found || !isStatusEqual(&oldStatus.status, &status) || pod.DeletionTimestamp != nil { - newStatus := versionedPodStatus{ - status: status, - version: oldStatus.version + 1, - podName: pod.Name, - podNamespace: pod.Namespace, - } - m.podStatuses[pod.UID] = newStatus - return &newStatus - } else { + if found && isStatusEqual(&oldStatus.status, &status) && pod.DeletionTimestamp == nil { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status) - return nil // No new status. + return false // No new status. + } + + newStatus := versionedPodStatus{ + status: status, + version: oldStatus.version + 1, + podName: pod.Name, + podNamespace: pod.Namespace, + } + m.podStatuses[pod.UID] = newStatus + + select { + case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: + return true + default: + // Let the periodic syncBatch handle the update if the channel is full. + // We can't block, since we hold the mutex lock. + return false } } diff --git a/pkg/kubelet/status/manager_test.go b/pkg/kubelet/status/manager_test.go index a7db67fe9c3..2c3021c4a4a 100644 --- a/pkg/kubelet/status/manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" @@ -499,3 +500,37 @@ func TestStaticPodStatus(t *testing.T) { _, found := m.GetPodStatus(otherPod.UID) assert.False(t, found, "otherPod status should have been deleted") } + +func TestSetContainerReadiness(t *testing.T) { + containerID := kubecontainer.ContainerID{"test", "cOnTaInEr_Id"} + containerStatus := api.ContainerStatus{ + Name: "cOnTaInEr_NaMe", + ContainerID: containerID.String(), + Ready: false, + } + status := api.PodStatus{ + ContainerStatuses: []api.ContainerStatus{containerStatus}, + } + + m := newTestManager(&testclient.Fake{}) + + t.Log("Setting readiness before status should fail.") + m.SetContainerReadiness(testPod, containerID, true) + verifyUpdates(t, m, 0) + + t.Log("Setting initial status.") + m.SetPodStatus(testPod, status) + verifyUpdates(t, m, 1) + + t.Log("Setting unchanged readiness should do nothing.") + m.SetContainerReadiness(testPod, containerID, false) + verifyUpdates(t, m, 0) + + t.Log("Setting different readiness should generate update.") + m.SetContainerReadiness(testPod, containerID, true) + verifyUpdates(t, m, 1) + + t.Log("Setting non-existant container readiness should fail.") + m.SetContainerReadiness(testPod, kubecontainer.ContainerID{"test", "foo"}, true) + verifyUpdates(t, m, 0) +}