diff --git a/pkg/kubelet/container/container_reference_manager.go b/pkg/kubelet/container/container_reference_manager.go index c6ba09b8f06..99aa35b6029 100644 --- a/pkg/kubelet/container/container_reference_manager.go +++ b/pkg/kubelet/container/container_reference_manager.go @@ -36,9 +36,7 @@ type RefManager struct { // NewRefManager creates and returns a container reference manager // with empty contents. func NewRefManager() *RefManager { - c := RefManager{} - c.containerIDToRef = make(map[string]*api.ObjectReference) - return &c + return &RefManager{containerIDToRef: make(map[string]*api.ObjectReference)} } // SetRef stores a reference to a pod's container, associating it with the given container ID. diff --git a/pkg/kubelet/container/readiness_manager.go b/pkg/kubelet/container/readiness_manager.go new file mode 100644 index 00000000000..03826821318 --- /dev/null +++ b/pkg/kubelet/container/readiness_manager.go @@ -0,0 +1,59 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import "sync" + +// ReadinessManager maintains the readiness information(probe results) of +// containers over time to allow for implementation of health thresholds. +// This manager is thread-safe, no locks are necessary for the caller. +type ReadinessManager struct { + // guards states + sync.RWMutex + // TODO(yifan): To use strong type. + states map[string]bool +} + +// NewReadinessManager creates ane returns a readiness manager with empty +// contents. +func NewReadinessManager() *ReadinessManager { + return &ReadinessManager{states: make(map[string]bool)} +} + +// GetReadiness returns the readiness value for the container with the given ID. +// If the readiness value is found, returns it. +// If the readiness is not found, returns false. +func (r *ReadinessManager) GetReadiness(id string) bool { + r.RLock() + defer r.RUnlock() + state, found := r.states[id] + return state && found +} + +// SetReadiness sets the readiness value for the container with the given ID. +func (r *ReadinessManager) SetReadiness(id string, value bool) { + r.Lock() + defer r.Unlock() + r.states[id] = value +} + +// RemoveReadiness clears the readiness value for the container with the given ID. +func (r *ReadinessManager) RemoveReadiness(id string) { + r.Lock() + defer r.Unlock() + delete(r.states, id) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6469b5547e4..aec6b1ea8a4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -206,25 +206,25 @@ func NewMainKubelet( statusManager := newStatusManager(kubeClient) klet := &Kubelet{ - hostname: hostname, - dockerClient: dockerClient, - kubeClient: kubeClient, - rootDirectory: rootDirectory, - resyncInterval: resyncInterval, - podInfraContainerImage: podInfraContainerImage, - containerRefManager: kubecontainer.NewRefManager(), - runner: dockertools.NewDockerContainerCommandRunner(dockerClient), - httpClient: &http.Client{}, - pullQPS: pullQPS, - pullBurst: pullBurst, - sourcesReady: sourcesReady, - clusterDomain: clusterDomain, - clusterDNS: clusterDNS, - serviceLister: serviceLister, - nodeLister: nodeLister, - masterServiceNamespace: masterServiceNamespace, - prober: newProbeHolder(), - readiness: newReadinessStates(), + hostname: hostname, + dockerClient: dockerClient, + kubeClient: kubeClient, + rootDirectory: rootDirectory, + resyncInterval: resyncInterval, + podInfraContainerImage: podInfraContainerImage, + containerRefManager: kubecontainer.NewRefManager(), + readinessManager: kubecontainer.NewReadinessManager(), + runner: dockertools.NewDockerContainerCommandRunner(dockerClient), + httpClient: &http.Client{}, + pullQPS: pullQPS, + pullBurst: pullBurst, + sourcesReady: sourcesReady, + clusterDomain: clusterDomain, + clusterDNS: clusterDNS, + serviceLister: serviceLister, + nodeLister: nodeLister, + masterServiceNamespace: masterServiceNamespace, + prober: newProbeHolder(), streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, recorder: recorder, cadvisor: cadvisorInterface, @@ -326,8 +326,8 @@ type Kubelet struct { // Probe runner holder prober probeHolder - // Container readiness state holder - readiness *readinessStates + // Container readiness state manager. + readinessManager *kubecontainer.ReadinessManager // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -818,7 +818,7 @@ func (kl *Kubelet) killContainer(c *kubecontainer.Container) error { func (kl *Kubelet) killContainerByID(ID string) error { glog.V(2).Infof("Killing container with id %q", ID) - kl.readiness.remove(ID) + kl.readinessManager.RemoveReadiness(ID) err := kl.dockerClient.StopContainer(ID, 10) ref, ok := kl.containerRefManager.GetRef(ID) @@ -986,7 +986,7 @@ func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api } // set dead containers to unready state for _, c := range recentContainers { - kl.readiness.remove(c.ID) + kl.readinessManager.RemoveReadiness(c.ID) } if len(recentContainers) > 0 { @@ -1915,7 +1915,8 @@ func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) { for _, c := range spec.Containers { for i, st := range podStatus.ContainerStatuses { if st.Name == c.Name { - podStatus.ContainerStatuses[i].Ready = kl.readiness.IsReady(st) + ready := st.State.Running != nil && kl.readinessManager.GetReadiness(strings.TrimPrefix(st.ContainerID, "docker://")) + podStatus.ContainerStatuses[i].Ready = ready break } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index db44e1fa727..4b43df8bee4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -101,7 +101,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} kubelet.nodeLister = testNodeLister{} - kubelet.readiness = newReadinessStates() + kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder kubelet.statusManager = newStatusManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { @@ -369,7 +369,7 @@ func TestKillContainerWithError(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet for _, c := range fakeDocker.ContainerList { - kubelet.readiness.set(c.ID, true) + kubelet.readinessManager.SetReadiness(c.ID, true) } kubelet.dockerClient = fakeDocker c := apiContainerToContainer(fakeDocker.ContainerList[0]) @@ -380,11 +380,13 @@ func TestKillContainerWithError(t *testing.T) { verifyCalls(t, fakeDocker, []string{"stop"}) killedContainer := containers[0] liveContainer := containers[1] - if _, found := kubelet.readiness.states[killedContainer.ID]; found { - t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, kubelet.readiness.states) + ready := kubelet.readinessManager.GetReadiness(killedContainer.ID) + if ready { + t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, ready) } - if _, found := kubelet.readiness.states[liveContainer.ID]; !found { - t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, kubelet.readiness.states) + ready = kubelet.readinessManager.GetReadiness(liveContainer.ID) + if !ready { + t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, ready) } } @@ -407,7 +409,7 @@ func TestKillContainer(t *testing.T) { Name: "foobar", } for _, c := range fakeDocker.ContainerList { - kubelet.readiness.set(c.ID, true) + kubelet.readinessManager.SetReadiness(c.ID, true) } c := apiContainerToContainer(fakeDocker.ContainerList[0]) @@ -418,11 +420,13 @@ func TestKillContainer(t *testing.T) { verifyCalls(t, fakeDocker, []string{"stop"}) killedContainer := containers[0] liveContainer := containers[1] - if _, found := kubelet.readiness.states[killedContainer.ID]; found { - t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, kubelet.readiness.states) + ready := kubelet.readinessManager.GetReadiness(killedContainer.ID) + if ready { + t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", killedContainer.ID, ready) } - if _, found := kubelet.readiness.states[liveContainer.ID]; !found { - t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, kubelet.readiness.states) + ready = kubelet.readinessManager.GetReadiness(liveContainer.ID) + if !ready { + t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", liveContainer.ID, ready) } } diff --git a/pkg/kubelet/probe.go b/pkg/kubelet/probe.go index 99c77920ad6..837d8b2e2fc 100644 --- a/pkg/kubelet/probe.go +++ b/pkg/kubelet/probe.go @@ -19,8 +19,6 @@ package kubelet import ( "fmt" "strconv" - "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -45,12 +43,12 @@ func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container live, err := kl.probeContainerLiveness(pod, status, container, createdAt) if err != nil { glog.V(1).Infof("Liveness probe errored: %v", err) - kl.readiness.set(containerID, false) + kl.readinessManager.SetReadiness(containerID, false) return probe.Unknown, err } if live != probe.Success { glog.V(1).Infof("Liveness probe unsuccessful: %v", live) - kl.readiness.set(containerID, false) + kl.readinessManager.SetReadiness(containerID, false) return live, nil } @@ -58,12 +56,12 @@ func (kl *Kubelet) probeContainer(pod *api.Pod, status api.PodStatus, container ready, err := kl.probeContainerReadiness(pod, status, container, createdAt) if err == nil && ready == probe.Success { glog.V(3).Infof("Readiness probe successful: %v", ready) - kl.readiness.set(containerID, true) + kl.readinessManager.SetReadiness(containerID, true) return probe.Success, nil } glog.V(1).Infof("Readiness probe failed/errored: %v, %v", ready, err) - kl.readiness.set(containerID, false) + kl.readinessManager.SetReadiness(containerID, false) ref, ok := kl.containerRefManager.GetRef(containerID) if !ok { @@ -204,44 +202,6 @@ func (eic execInContainer) SetDir(dir string) { //unimplemented } -// This will eventually maintain info about probe results over time -// to allow for implementation of health thresholds -func newReadinessStates() *readinessStates { - return &readinessStates{states: make(map[string]bool)} -} - -type readinessStates struct { - // guards states - sync.RWMutex - states map[string]bool -} - -func (r *readinessStates) IsReady(c api.ContainerStatus) bool { - if c.State.Running == nil { - return false - } - return r.get(strings.TrimPrefix(c.ContainerID, "docker://")) -} - -func (r *readinessStates) get(key string) bool { - r.RLock() - defer r.RUnlock() - state, found := r.states[key] - return state && found -} - -func (r *readinessStates) set(key string, value bool) { - r.Lock() - defer r.Unlock() - r.states[key] = value -} - -func (r *readinessStates) remove(key string) { - r.Lock() - defer r.Unlock() - delete(r.states, key) -} - func newProbeHolder() probeHolder { return probeHolder{ exec: execprobe.New(), diff --git a/pkg/kubelet/probe_test.go b/pkg/kubelet/probe_test.go index 5f12124d805..85084fd7da6 100644 --- a/pkg/kubelet/probe_test.go +++ b/pkg/kubelet/probe_test.go @@ -147,7 +147,7 @@ func (p fakeExecProber) Probe(_ exec.Cmd) (probe.Result, error) { func makeTestKubelet(result probe.Result, err error) *Kubelet { return &Kubelet{ - readiness: newReadinessStates(), + readinessManager: kubecontainer.NewReadinessManager(), prober: probeHolder{ exec: fakeExecProber{ result: result, @@ -412,8 +412,8 @@ func TestProbeContainer(t *testing.T) { if test.expectedResult != result { t.Errorf("Expected result was %v but probeContainer() returned %v", test.expectedResult, result) } - if test.expectedReadiness != kl.readiness.get(dc.ID) { - t.Errorf("Expected readiness was %v but probeContainer() set %v", test.expectedReadiness, kl.readiness.get(dc.ID)) + if test.expectedReadiness != kl.readinessManager.GetReadiness(dc.ID) { + t.Errorf("Expected readiness was %v but probeContainer() set %v", test.expectedReadiness, kl.readinessManager.GetReadiness(dc.ID)) } } }