Generalize readinessManager to handle liveness

This commit is contained in:
Tim St. Clair 2015-10-07 16:40:30 -07:00
parent aa307da594
commit 9b8bc50357
7 changed files with 159 additions and 85 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
@ -53,7 +54,7 @@ type Manager interface {
type manager struct {
// Caches the results of readiness probes.
readinessCache *readinessManager
readinessCache results.Manager
// Map of active workers for readiness
readinessProbes map[containerPath]*worker
@ -78,7 +79,7 @@ func NewManager(
defaultProbePeriod: defaultProbePeriod,
statusManager: statusManager,
prober: prober,
readinessCache: newReadinessManager(),
readinessCache: results.NewManager(),
readinessProbes: make(map[containerPath]*worker),
}
}
@ -141,9 +142,9 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) {
var ready bool
if c.State.Running == nil {
ready = false
} else if result, ok := m.readinessCache.getReadiness(
} else if result, ok := m.readinessCache.Get(
kubecontainer.ParseContainerID(c.ContainerID)); ok {
ready = result
ready = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getReadinessProbe(podUID, c.Name)

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util/wait"
@ -201,9 +202,9 @@ func TestUpdatePodStatus(t *testing.T) {
containerPath{podUID, terminated.Name}: {},
}
m.readinessCache.setReadiness(kubecontainer.ParseContainerID(probedReady.ContainerID), true)
m.readinessCache.setReadiness(kubecontainer.ParseContainerID(probedUnready.ContainerID), false)
m.readinessCache.setReadiness(kubecontainer.ParseContainerID(terminated.ContainerID), true)
m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success)
m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure)
m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success)
m.UpdatePodStatus(podUID, &podStatus)

View File

@ -1,62 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"sync"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// readinessManager maintains the readiness information(probe results) of
// containers over time to allow for implementation of health thresholds.
// This manager is thread-safe, no locks are necessary for the caller.
type readinessManager struct {
// guards states
sync.RWMutex
states map[kubecontainer.ContainerID]bool
}
// newReadinessManager creates ane returns a readiness manager with empty
// contents.
func newReadinessManager() *readinessManager {
return &readinessManager{states: make(map[kubecontainer.ContainerID]bool)}
}
// getReadiness returns the readiness value for the container with the given ID.
// If the readiness value is found, returns it.
// If the readiness is not found, returns false.
func (r *readinessManager) getReadiness(id kubecontainer.ContainerID) (ready bool, found bool) {
r.RLock()
defer r.RUnlock()
state, found := r.states[id]
return state, found
}
// setReadiness sets the readiness value for the container with the given ID.
func (r *readinessManager) setReadiness(id kubecontainer.ContainerID, value bool) {
r.Lock()
defer r.Unlock()
r.states[id] = value
}
// removeReadiness clears the readiness value for the container with the given ID.
func (r *readinessManager) removeReadiness(id kubecontainer.ContainerID) {
r.Lock()
defer r.Unlock()
delete(r.states, id)
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package results
import (
"sync"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// Manager provides a probe results cache.
type Manager interface {
// Get returns the cached result for the container with the given ID.
Get(id kubecontainer.ContainerID) (Result, bool)
// Set sets the cached result for the container with the given ID.
Set(id kubecontainer.ContainerID, result Result)
// Remove clears the cached result for the container with the given ID.
Remove(id kubecontainer.ContainerID)
}
// Result is the type for probe results.
type Result bool
const (
Success Result = true
Failure Result = false
)
func (r Result) String() string {
switch r {
case Success:
return "Success"
case Failure:
return "Failure"
default:
return "UNKNOWN"
}
}
// Manager implementation.
type manager struct {
// guards the cache
sync.RWMutex
// map of container ID -> probe Result
cache map[kubecontainer.ContainerID]Result
}
var _ Manager = &manager{}
// NewManager creates ane returns an empty results manager.
func NewManager() Manager {
return &manager{cache: make(map[kubecontainer.ContainerID]Result)}
}
func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {
m.RLock()
defer m.RUnlock()
result, found := m.cache[id]
return result, found
}
func (m *manager) Set(id kubecontainer.ContainerID, result Result) {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result {
m.cache[id] = result
}
}
func (m *manager) Remove(id kubecontainer.ContainerID) {
m.Lock()
defer m.Unlock()
delete(m.cache, id)
}

View File

@ -0,0 +1,43 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package results
import (
"testing"
"github.com/stretchr/testify/assert"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func TestCacheOperations(t *testing.T) {
m := NewManager()
unsetID := kubecontainer.ContainerID{"test", "unset"}
setID := kubecontainer.ContainerID{"test", "set"}
_, found := m.Get(unsetID)
assert.False(t, found, "unset result found")
m.Set(setID, Success)
result, found := m.Get(setID)
assert.True(t, result == Success, "set result")
assert.True(t, found, "set result found")
m.Remove(setID)
_, found = m.Get(setID)
assert.False(t, found, "removed result found")
}

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
@ -75,7 +76,7 @@ func run(m *manager, w *worker) {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
m.readinessCache.removeReadiness(w.containerID)
m.readinessCache.Remove(w.containerID)
}
m.removeReadinessProbe(w.pod.UID, w.container.Name)
@ -122,7 +123,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
m.readinessCache.removeReadiness(w.containerID)
m.readinessCache.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
}
@ -130,7 +131,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
kubeutil.FormatPodName(w.pod), w.container.Name)
m.readinessCache.setReadiness(w.containerID, false)
m.readinessCache.Set(w.containerID, results.Failure)
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
@ -138,14 +139,14 @@ func doProbe(m *manager, w *worker) (keepGoing bool) {
if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
// Readiness defaults to false during the initial delay.
m.readinessCache.setReadiness(w.containerID, false)
m.readinessCache.Set(w.containerID, results.Failure)
return true
}
// TODO: Move error handling out of prober.
result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID)
if result != probe.Unknown {
m.readinessCache.setReadiness(w.containerID, result != probe.Failure)
m.readinessCache.Set(w.containerID, result != probe.Failure)
}
return true

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe"
)
@ -56,7 +57,7 @@ func TestDoProbe(t *testing.T) {
expectContinue bool
expectReadySet bool
expectedReadiness bool
expectedReadiness results.Result
}{
{ // No status.
expectContinue: true,
@ -81,7 +82,7 @@ func TestDoProbe(t *testing.T) {
podStatus: &runningStatus,
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
expectedReadiness: results.Success,
},
{ // Initial delay passed
podStatus: &runningStatus,
@ -90,7 +91,7 @@ func TestDoProbe(t *testing.T) {
},
expectContinue: true,
expectReadySet: true,
expectedReadiness: true,
expectedReadiness: results.Success,
},
}
@ -102,7 +103,7 @@ func TestDoProbe(t *testing.T) {
if c := doProbe(m, w); c != test.expectContinue {
t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c)
}
ready, ok := m.readinessCache.getReadiness(containerID)
ready, ok := m.readinessCache.Get(containerID)
if ok != test.expectReadySet {
t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok)
}
@ -112,7 +113,7 @@ func TestDoProbe(t *testing.T) {
// Clean up.
m.statusManager.DeletePodStatus(podUID)
m.readinessCache.removeReadiness(containerID)
m.readinessCache.Remove(containerID)
}
}
@ -127,7 +128,7 @@ func TestInitialDelay(t *testing.T) {
t.Errorf("Expected to continue, but did not")
}
ready, ok := m.readinessCache.getReadiness(containerID)
ready, ok := m.readinessCache.Get(containerID)
if !ok {
t.Errorf("Expected readiness to be false, but was not set")
} else if ready {
@ -145,7 +146,7 @@ func TestInitialDelay(t *testing.T) {
t.Errorf("Expected to continue, but did not")
}
ready, ok = m.readinessCache.getReadiness(containerID)
ready, ok = m.readinessCache.Get(containerID)
if !ok {
t.Errorf("Expected readiness to be true, but was not set")
} else if !ready {
@ -157,11 +158,11 @@ func TestCleanUp(t *testing.T) {
m := newTestManager()
pod := getTestPod(api.Probe{})
m.statusManager.SetPodStatus(&pod, getRunningStatus())
m.readinessCache.setReadiness(containerID, true)
m.readinessCache.Set(containerID, results.Success)
w := m.newWorker(&pod, pod.Spec.Containers[0])
m.readinessProbes[containerPath{podUID, containerName}] = w
if ready, _ := m.readinessCache.getReadiness(containerID); !ready {
if ready, _ := m.readinessCache.Get(containerID); !ready {
t.Fatal("Expected readiness to be true.")
}
@ -170,7 +171,7 @@ func TestCleanUp(t *testing.T) {
t.Fatal(err)
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
if _, ok := m.readinessCache.Get(containerID); ok {
t.Error("Expected readiness to be cleared.")
}
if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok {
@ -188,7 +189,7 @@ func TestHandleCrash(t *testing.T) {
if !doProbe(m, w) {
t.Error("Expected to keep going, but terminated.")
}
if _, ok := m.readinessCache.getReadiness(containerID); ok {
if _, ok := m.readinessCache.Get(containerID); ok {
t.Error("Expected readiness to be unchanged from crash.")
}
}