diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index 84c87b4dc2a..a39089a3f5d 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" @@ -53,7 +54,7 @@ type Manager interface { type manager struct { // Caches the results of readiness probes. - readinessCache *readinessManager + readinessCache results.Manager // Map of active workers for readiness readinessProbes map[containerPath]*worker @@ -78,7 +79,7 @@ func NewManager( defaultProbePeriod: defaultProbePeriod, statusManager: statusManager, prober: prober, - readinessCache: newReadinessManager(), + readinessCache: results.NewManager(), readinessProbes: make(map[containerPath]*worker), } } @@ -141,9 +142,9 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) { var ready bool if c.State.Running == nil { ready = false - } else if result, ok := m.readinessCache.getReadiness( + } else if result, ok := m.readinessCache.Get( kubecontainer.ParseContainerID(c.ContainerID)); ok { - ready = result + ready = result == results.Success } else { // The check whether there is a probe which hasn't run yet. _, exists := m.getReadinessProbe(podUID, c.Name) diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index a3771eb5d61..9caf1cc37d0 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned/testclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util/wait" @@ -201,9 +202,9 @@ func TestUpdatePodStatus(t *testing.T) { containerPath{podUID, terminated.Name}: {}, } - m.readinessCache.setReadiness(kubecontainer.ParseContainerID(probedReady.ContainerID), true) - m.readinessCache.setReadiness(kubecontainer.ParseContainerID(probedUnready.ContainerID), false) - m.readinessCache.setReadiness(kubecontainer.ParseContainerID(terminated.ContainerID), true) + m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success) + m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure) + m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success) m.UpdatePodStatus(podUID, &podStatus) diff --git a/pkg/kubelet/prober/readiness_manager.go b/pkg/kubelet/prober/readiness_manager.go deleted file mode 100644 index 665032b644b..00000000000 --- a/pkg/kubelet/prober/readiness_manager.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package prober - -import ( - "sync" - - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" -) - -// readinessManager maintains the readiness information(probe results) of -// containers over time to allow for implementation of health thresholds. -// This manager is thread-safe, no locks are necessary for the caller. -type readinessManager struct { - // guards states - sync.RWMutex - states map[kubecontainer.ContainerID]bool -} - -// newReadinessManager creates ane returns a readiness manager with empty -// contents. -func newReadinessManager() *readinessManager { - return &readinessManager{states: make(map[kubecontainer.ContainerID]bool)} -} - -// getReadiness returns the readiness value for the container with the given ID. -// If the readiness value is found, returns it. -// If the readiness is not found, returns false. -func (r *readinessManager) getReadiness(id kubecontainer.ContainerID) (ready bool, found bool) { - r.RLock() - defer r.RUnlock() - state, found := r.states[id] - return state, found -} - -// setReadiness sets the readiness value for the container with the given ID. -func (r *readinessManager) setReadiness(id kubecontainer.ContainerID, value bool) { - r.Lock() - defer r.Unlock() - r.states[id] = value -} - -// removeReadiness clears the readiness value for the container with the given ID. -func (r *readinessManager) removeReadiness(id kubecontainer.ContainerID) { - r.Lock() - defer r.Unlock() - delete(r.states, id) -} diff --git a/pkg/kubelet/prober/results/results_manager.go b/pkg/kubelet/prober/results/results_manager.go new file mode 100644 index 00000000000..208d3d5ffd4 --- /dev/null +++ b/pkg/kubelet/prober/results/results_manager.go @@ -0,0 +1,89 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package results + +import ( + "sync" + + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +// Manager provides a probe results cache. +type Manager interface { + // Get returns the cached result for the container with the given ID. + Get(id kubecontainer.ContainerID) (Result, bool) + // Set sets the cached result for the container with the given ID. + Set(id kubecontainer.ContainerID, result Result) + // Remove clears the cached result for the container with the given ID. + Remove(id kubecontainer.ContainerID) +} + +// Result is the type for probe results. +type Result bool + +const ( + Success Result = true + Failure Result = false +) + +func (r Result) String() string { + switch r { + case Success: + return "Success" + case Failure: + return "Failure" + default: + return "UNKNOWN" + } +} + +// Manager implementation. +type manager struct { + // guards the cache + sync.RWMutex + // map of container ID -> probe Result + cache map[kubecontainer.ContainerID]Result +} + +var _ Manager = &manager{} + +// NewManager creates ane returns an empty results manager. +func NewManager() Manager { + return &manager{cache: make(map[kubecontainer.ContainerID]Result)} +} + +func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { + m.RLock() + defer m.RUnlock() + result, found := m.cache[id] + return result, found +} + +func (m *manager) Set(id kubecontainer.ContainerID, result Result) { + m.Lock() + defer m.Unlock() + prev, exists := m.cache[id] + if !exists || prev != result { + m.cache[id] = result + } +} + +func (m *manager) Remove(id kubecontainer.ContainerID) { + m.Lock() + defer m.Unlock() + delete(m.cache, id) +} diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go new file mode 100644 index 00000000000..d815c1a84b8 --- /dev/null +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -0,0 +1,43 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package results + +import ( + "testing" + + "github.com/stretchr/testify/assert" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +func TestCacheOperations(t *testing.T) { + m := NewManager() + + unsetID := kubecontainer.ContainerID{"test", "unset"} + setID := kubecontainer.ContainerID{"test", "set"} + + _, found := m.Get(unsetID) + assert.False(t, found, "unset result found") + + m.Set(setID, Success) + result, found := m.Get(setID) + assert.True(t, result == Success, "set result") + assert.True(t, found, "set result found") + + m.Remove(setID) + _, found = m.Get(setID) + assert.False(t, found, "removed result found") +} diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index 20ca15aef2f..cca6e77e7e7 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/prober/results" kubeutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" @@ -75,7 +76,7 @@ func run(m *manager, w *worker) { // Clean up. probeTicker.Stop() if !w.containerID.IsEmpty() { - m.readinessCache.removeReadiness(w.containerID) + m.readinessCache.Remove(w.containerID) } m.removeReadinessProbe(w.pod.UID, w.container.Name) @@ -122,7 +123,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if w.containerID.String() != c.ContainerID { if !w.containerID.IsEmpty() { - m.readinessCache.removeReadiness(w.containerID) + m.readinessCache.Remove(w.containerID) } w.containerID = kubecontainer.ParseContainerID(c.ContainerID) } @@ -130,7 +131,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if c.State.Running == nil { glog.V(3).Infof("Non-running container probed: %v - %v", kubeutil.FormatPodName(w.pod), w.container.Name) - m.readinessCache.setReadiness(w.containerID, false) + m.readinessCache.Set(w.containerID, results.Failure) // Abort if the container will not be restarted. return c.State.Terminated == nil || w.pod.Spec.RestartPolicy != api.RestartPolicyNever @@ -138,14 +139,14 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { // Readiness defaults to false during the initial delay. - m.readinessCache.setReadiness(w.containerID, false) + m.readinessCache.Set(w.containerID, results.Failure) return true } // TODO: Move error handling out of prober. result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID) if result != probe.Unknown { - m.readinessCache.setReadiness(w.containerID, result != probe.Failure) + m.readinessCache.Set(w.containerID, result != probe.Failure) } return true diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 1d2408ab1fe..3a7a6262f73 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" ) @@ -56,7 +57,7 @@ func TestDoProbe(t *testing.T) { expectContinue bool expectReadySet bool - expectedReadiness bool + expectedReadiness results.Result }{ { // No status. expectContinue: true, @@ -81,7 +82,7 @@ func TestDoProbe(t *testing.T) { podStatus: &runningStatus, expectContinue: true, expectReadySet: true, - expectedReadiness: true, + expectedReadiness: results.Success, }, { // Initial delay passed podStatus: &runningStatus, @@ -90,7 +91,7 @@ func TestDoProbe(t *testing.T) { }, expectContinue: true, expectReadySet: true, - expectedReadiness: true, + expectedReadiness: results.Success, }, } @@ -102,7 +103,7 @@ func TestDoProbe(t *testing.T) { if c := doProbe(m, w); c != test.expectContinue { t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c) } - ready, ok := m.readinessCache.getReadiness(containerID) + ready, ok := m.readinessCache.Get(containerID) if ok != test.expectReadySet { t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok) } @@ -112,7 +113,7 @@ func TestDoProbe(t *testing.T) { // Clean up. m.statusManager.DeletePodStatus(podUID) - m.readinessCache.removeReadiness(containerID) + m.readinessCache.Remove(containerID) } } @@ -127,7 +128,7 @@ func TestInitialDelay(t *testing.T) { t.Errorf("Expected to continue, but did not") } - ready, ok := m.readinessCache.getReadiness(containerID) + ready, ok := m.readinessCache.Get(containerID) if !ok { t.Errorf("Expected readiness to be false, but was not set") } else if ready { @@ -145,7 +146,7 @@ func TestInitialDelay(t *testing.T) { t.Errorf("Expected to continue, but did not") } - ready, ok = m.readinessCache.getReadiness(containerID) + ready, ok = m.readinessCache.Get(containerID) if !ok { t.Errorf("Expected readiness to be true, but was not set") } else if !ready { @@ -157,11 +158,11 @@ func TestCleanUp(t *testing.T) { m := newTestManager() pod := getTestPod(api.Probe{}) m.statusManager.SetPodStatus(&pod, getRunningStatus()) - m.readinessCache.setReadiness(containerID, true) + m.readinessCache.Set(containerID, results.Success) w := m.newWorker(&pod, pod.Spec.Containers[0]) m.readinessProbes[containerPath{podUID, containerName}] = w - if ready, _ := m.readinessCache.getReadiness(containerID); !ready { + if ready, _ := m.readinessCache.Get(containerID); !ready { t.Fatal("Expected readiness to be true.") } @@ -170,7 +171,7 @@ func TestCleanUp(t *testing.T) { t.Fatal(err) } - if _, ok := m.readinessCache.getReadiness(containerID); ok { + if _, ok := m.readinessCache.Get(containerID); ok { t.Error("Expected readiness to be cleared.") } if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok { @@ -188,7 +189,7 @@ func TestHandleCrash(t *testing.T) { if !doProbe(m, w) { t.Error("Expected to keep going, but terminated.") } - if _, ok := m.readinessCache.getReadiness(containerID); ok { + if _, ok := m.readinessCache.Get(containerID); ok { t.Error("Expected readiness to be unchanged from crash.") } }