From 1f91fffb575b4e8155145a68cd3a23d806655353 Mon Sep 17 00:00:00 2001 From: "Tim St. Clair" Date: Fri, 11 Sep 2015 12:22:01 -0700 Subject: [PATCH] Move kubelet.statusManager to status.Manager This refactor is in preparation for moving more state handling to the status manager. It will become the canonical cache for the latest information on running containers and probe status, as part of the prober refactoring. --- pkg/kubelet/kubelet.go | 5 +- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/runonce_test.go | 3 +- .../{status_manager.go => status/manager.go} | 107 +++++++++++------- .../manager_test.go} | 24 ++-- 5 files changed, 84 insertions(+), 58 deletions(-) rename pkg/kubelet/{status_manager.go => status/manager.go} (71%) rename pkg/kubelet/{status_manager_test.go => status/manager_test.go} (92%) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c0dbacbd59e..e4a1429b792 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/rkt" + "k8s.io/kubernetes/pkg/kubelet/status" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/labels" @@ -244,7 +245,7 @@ func NewMainKubelet( if err != nil { return nil, fmt.Errorf("failed to initialize disk manager: %v", err) } - statusManager := newStatusManager(kubeClient) + statusManager := status.NewManager(kubeClient) readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() @@ -503,7 +504,7 @@ type Kubelet struct { machineInfo *cadvisorApi.MachineInfo // Syncs pods statuses with apiserver; also used as a cache of statuses. - statusManager *statusManager + statusManager status.Manager // Manager for the volume maps for the pods. volumeManager *volumeManager diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8d7f4233fa7..8da898b9e52 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -105,7 +106,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.nodeLister = testNodeLister{} kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder - kubelet.statusManager = newStatusManager(fakeKubeClient) + kubelet.statusManager = status.NewManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 8de1627af05..3166a486db1 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -30,6 +30,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/status" ) type listContainersResult struct { @@ -83,7 +84,7 @@ func TestRunOnce(t *testing.T) { recorder: &record.FakeRecorder{}, cadvisor: cadvisor, nodeLister: testNodeLister{}, - statusManager: newStatusManager(nil), + statusManager: status.NewManager(nil), containerRefManager: kubecontainer.NewRefManager(), readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status/manager.go similarity index 71% rename from pkg/kubelet/status_manager.go rename to pkg/kubelet/status/manager.go index c52ccddb584..e47bd11f560 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status/manager.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package status import ( "fmt" @@ -39,16 +39,42 @@ type podStatusSyncRequest struct { // Updates pod statuses in apiserver. Writes only when new status has changed. // All methods are thread-safe. -type statusManager struct { +type manager struct { kubeClient client.Interface // Map from pod full name to sync status of the corresponding pod. - podStatusesLock sync.RWMutex podStatuses map[types.UID]api.PodStatus + podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest } -func newStatusManager(kubeClient client.Interface) *statusManager { - return &statusManager{ +// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with +// the latest api.PodStatus. It also syncs updates back to the API server. +type Manager interface { + // Start the API server status sync loop. + Start() + + // GetPodStatus returns the cached status for the provided pod UID, as well as whether it + // was a cache hit. + GetPodStatus(uid types.UID) (api.PodStatus, bool) + + // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. + SetPodStatus(pod *api.Pod, status api.PodStatus) + + // TerminatePods resets the container status for the provided pods to terminated and triggers + // a status update. This function may not enqueue all the provided pods, in which case it will + // return false + TerminatePods(pods []*api.Pod) bool + + // DeletePodStatus simply removes the given pod from the status cache. + DeletePodStatus(uid types.UID) + + // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in + // the provided podUIDs. + RemoveOrphanedStatuses(podUIDs map[types.UID]bool) +} + +func NewManager(kubeClient client.Interface) Manager { + return &manager{ kubeClient: kubeClient, podStatuses: make(map[types.UID]api.PodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses @@ -65,35 +91,35 @@ func isStatusEqual(oldStatus, status *api.PodStatus) bool { return reflect.DeepEqual(status, oldStatus) } -func (s *statusManager) Start() { +func (m *manager) Start() { // Don't start the status manager if we don't have a client. This will happen // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. - if s.kubeClient == nil { + if m.kubeClient == nil { glog.Infof("Kubernetes client is nil, not starting status manager.") return } // syncBatch blocks when no updates are available, we can run it in a tight loop. glog.Info("Starting to sync pod status with apiserver") go util.Until(func() { - err := s.syncBatch() + err := m.syncBatch() if err != nil { glog.Warningf("Failed to updated pod status: %v", err) } }, 0, util.NeverStop) } -func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { - s.podStatusesLock.RLock() - defer s.podStatusesLock.RUnlock() - status, ok := s.podStatuses[uid] +func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + status, ok := m.podStatuses[uid] return status, ok } -func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - oldStatus, found := s.podStatuses[pod.UID] +func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + oldStatus, found := m.podStatuses[pod.UID] // ensure that the start time does not change across updates. if found && oldStatus.StartTime != nil { @@ -102,7 +128,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // if the status has no start time, we need to set an initial time // TODO(yujuhong): Consider setting StartTime when generating the pod - // status instead, which would allow statusManager to become a simple cache + // status instead, which would allow manager to become a simple cache // again. if status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() { @@ -123,20 +149,17 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // workers and/or the kubelet but dropping the lock before sending the // status down the channel feels like an easy way to get a bullet in foot. if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { - s.podStatuses[pod.UID] = status - s.podStatusChannel <- podStatusSyncRequest{pod, status} + m.podStatuses[pod.UID] = status + m.podStatusChannel <- podStatusSyncRequest{pod, status} } else { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status) } } -// TerminatePods resets the container status for the provided pods to terminated and triggers -// a status update. This function may not enqueue all the provided pods, in which case it will -// return false -func (s *statusManager) TerminatePods(pods []*api.Pod) bool { +func (m *manager) TerminatePods(pods []*api.Pod) bool { sent := true - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() for _, pod := range pods { for i := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[i].State = api.ContainerState{ @@ -144,7 +167,7 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { } } select { - case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: + case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: default: sent = false glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) @@ -153,27 +176,27 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { return sent } -func (s *statusManager) DeletePodStatus(uid types.UID) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - delete(s.podStatuses, uid) +func (m *manager) DeletePodStatus(uid types.UID) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + delete(m.podStatuses, uid) } // TODO(filipg): It'd be cleaner if we can do this without signal from user. -func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - for key := range s.podStatuses { +func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + for key := range m.podStatuses { if _, ok := podUIDs[key]; !ok { glog.V(5).Infof("Removing %q from status map.", key) - delete(s.podStatuses, key) + delete(m.podStatuses, key) } } } // syncBatch syncs pods statuses with the apiserver. -func (s *statusManager) syncBatch() error { - syncRequest := <-s.podStatusChannel +func (m *manager) syncBatch() error { + syncRequest := <-m.podStatusChannel pod := syncRequest.pod status := syncRequest.status @@ -182,7 +205,7 @@ func (s *statusManager) syncBatch() error { ObjectMeta: pod.ObjectMeta, } // TODO: make me easier to express from client code - statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) + statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) if errors.IsNotFound(err) { glog.V(3).Infof("Pod %q was deleted on the server", pod.Name) return nil @@ -194,7 +217,7 @@ func (s *statusManager) syncBatch() error { } statusPod.Status = status // TODO: handle conflict as a retry, make that easier too. - statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) + statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) @@ -205,9 +228,9 @@ func (s *statusManager) syncBatch() error { glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name) return nil } - if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { + if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) - s.DeletePodStatus(pod.UID) + m.DeletePodStatus(pod.UID) return nil } } @@ -220,7 +243,7 @@ func (s *statusManager) syncBatch() error { // is full, and the pod worker holding the lock is waiting on this method // to clear the channel. Even if this delete never runs subsequent container // changes on the node should trigger updates. - go s.DeletePodStatus(pod.UID) + go m.DeletePodStatus(pod.UID) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status/manager_test.go similarity index 92% rename from pkg/kubelet/status_manager_test.go rename to pkg/kubelet/status/manager_test.go index e04f24b80f0..2a8f7e5a8cd 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package status import ( "fmt" @@ -37,8 +37,8 @@ var testPod *api.Pod = &api.Pod{ }, } -func newTestStatusManager() *statusManager { - return newStatusManager(&testclient.Fake{}) +func newTestManager() *manager { + return NewManager(&testclient.Fake{}).(*manager) } func generateRandomMessage() string { @@ -66,7 +66,7 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions [] } } -func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { +func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { // Consume all updates in the channel. numUpdates := 0 for { @@ -89,7 +89,7 @@ func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { } func TestNewStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) @@ -100,7 +100,7 @@ func TestNewStatus(t *testing.T) { } func TestNewStatusPreservesPodStartTime(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -121,14 +121,14 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) { } func TestChangedStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 2) } func TestChangedStatusKeepsStartTime(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() now := util.Now() firstStatus := getRandomPodStatus() firstStatus.StartTime = &now @@ -145,7 +145,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { } func TestUnchangedStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() podStatus := getRandomPodStatus() syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus) @@ -153,7 +153,7 @@ func TestUnchangedStatus(t *testing.T) { } func TestSyncBatchIgnoresNotFound(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() if err != nil { @@ -165,7 +165,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) { } func TestSyncBatch(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() @@ -180,7 +180,7 @@ func TestSyncBatch(t *testing.T) { } func TestSyncBatchChecksMismatchedUID(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() testPod.UID = "first" differentPod := *testPod differentPod.UID = "second"