diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 38458fd1324..03a69a7535b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/rkt" + "k8s.io/kubernetes/pkg/kubelet/status" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/labels" @@ -241,7 +242,7 @@ func NewMainKubelet( if err != nil { return nil, fmt.Errorf("failed to initialize disk manager: %v", err) } - statusManager := newStatusManager(kubeClient) + statusManager := status.NewManager(kubeClient) readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() @@ -501,7 +502,7 @@ type Kubelet struct { machineInfo *cadvisorApi.MachineInfo // Syncs pods statuses with apiserver; also used as a cache of statuses. - statusManager *statusManager + statusManager status.Manager // Manager for the volume maps for the pods. volumeManager *volumeManager diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b9a3c767839..339b055eed7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -105,7 +106,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.nodeLister = testNodeLister{} kubelet.readinessManager = kubecontainer.NewReadinessManager() kubelet.recorder = fakeRecorder - kubelet.statusManager = newStatusManager(fakeKubeClient) + kubelet.statusManager = status.NewManager(fakeKubeClient) if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 4d6435cd332..f236ebfa6cd 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -30,6 +30,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/status" ) type listContainersResult struct { @@ -83,7 +84,7 @@ func TestRunOnce(t *testing.T) { recorder: &record.FakeRecorder{}, cadvisor: cadvisor, nodeLister: testNodeLister{}, - statusManager: newStatusManager(nil), + statusManager: status.NewManager(nil), containerRefManager: kubecontainer.NewRefManager(), readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status/manager.go similarity index 71% rename from pkg/kubelet/status_manager.go rename to pkg/kubelet/status/manager.go index c52ccddb584..e47bd11f560 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status/manager.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package status import ( "fmt" @@ -39,16 +39,42 @@ type podStatusSyncRequest struct { // Updates pod statuses in apiserver. Writes only when new status has changed. // All methods are thread-safe. -type statusManager struct { +type manager struct { kubeClient client.Interface // Map from pod full name to sync status of the corresponding pod. - podStatusesLock sync.RWMutex podStatuses map[types.UID]api.PodStatus + podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest } -func newStatusManager(kubeClient client.Interface) *statusManager { - return &statusManager{ +// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with +// the latest api.PodStatus. It also syncs updates back to the API server. +type Manager interface { + // Start the API server status sync loop. + Start() + + // GetPodStatus returns the cached status for the provided pod UID, as well as whether it + // was a cache hit. + GetPodStatus(uid types.UID) (api.PodStatus, bool) + + // SetPodStatus caches updates the cached status for the given pod, and triggers a status update. + SetPodStatus(pod *api.Pod, status api.PodStatus) + + // TerminatePods resets the container status for the provided pods to terminated and triggers + // a status update. This function may not enqueue all the provided pods, in which case it will + // return false + TerminatePods(pods []*api.Pod) bool + + // DeletePodStatus simply removes the given pod from the status cache. + DeletePodStatus(uid types.UID) + + // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in + // the provided podUIDs. + RemoveOrphanedStatuses(podUIDs map[types.UID]bool) +} + +func NewManager(kubeClient client.Interface) Manager { + return &manager{ kubeClient: kubeClient, podStatuses: make(map[types.UID]api.PodStatus), podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses @@ -65,35 +91,35 @@ func isStatusEqual(oldStatus, status *api.PodStatus) bool { return reflect.DeepEqual(status, oldStatus) } -func (s *statusManager) Start() { +func (m *manager) Start() { // Don't start the status manager if we don't have a client. This will happen // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. - if s.kubeClient == nil { + if m.kubeClient == nil { glog.Infof("Kubernetes client is nil, not starting status manager.") return } // syncBatch blocks when no updates are available, we can run it in a tight loop. glog.Info("Starting to sync pod status with apiserver") go util.Until(func() { - err := s.syncBatch() + err := m.syncBatch() if err != nil { glog.Warningf("Failed to updated pod status: %v", err) } }, 0, util.NeverStop) } -func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { - s.podStatusesLock.RLock() - defer s.podStatusesLock.RUnlock() - status, ok := s.podStatuses[uid] +func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + status, ok := m.podStatuses[uid] return status, ok } -func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - oldStatus, found := s.podStatuses[pod.UID] +func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + oldStatus, found := m.podStatuses[pod.UID] // ensure that the start time does not change across updates. if found && oldStatus.StartTime != nil { @@ -102,7 +128,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // if the status has no start time, we need to set an initial time // TODO(yujuhong): Consider setting StartTime when generating the pod - // status instead, which would allow statusManager to become a simple cache + // status instead, which would allow manager to become a simple cache // again. if status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() { @@ -123,20 +149,17 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // workers and/or the kubelet but dropping the lock before sending the // status down the channel feels like an easy way to get a bullet in foot. if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { - s.podStatuses[pod.UID] = status - s.podStatusChannel <- podStatusSyncRequest{pod, status} + m.podStatuses[pod.UID] = status + m.podStatusChannel <- podStatusSyncRequest{pod, status} } else { glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status) } } -// TerminatePods resets the container status for the provided pods to terminated and triggers -// a status update. This function may not enqueue all the provided pods, in which case it will -// return false -func (s *statusManager) TerminatePods(pods []*api.Pod) bool { +func (m *manager) TerminatePods(pods []*api.Pod) bool { sent := true - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() for _, pod := range pods { for i := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[i].State = api.ContainerState{ @@ -144,7 +167,7 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { } } select { - case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: + case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: default: sent = false glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) @@ -153,27 +176,27 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool { return sent } -func (s *statusManager) DeletePodStatus(uid types.UID) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - delete(s.podStatuses, uid) +func (m *manager) DeletePodStatus(uid types.UID) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + delete(m.podStatuses, uid) } // TODO(filipg): It'd be cleaner if we can do this without signal from user. -func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { - s.podStatusesLock.Lock() - defer s.podStatusesLock.Unlock() - for key := range s.podStatuses { +func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + for key := range m.podStatuses { if _, ok := podUIDs[key]; !ok { glog.V(5).Infof("Removing %q from status map.", key) - delete(s.podStatuses, key) + delete(m.podStatuses, key) } } } // syncBatch syncs pods statuses with the apiserver. -func (s *statusManager) syncBatch() error { - syncRequest := <-s.podStatusChannel +func (m *manager) syncBatch() error { + syncRequest := <-m.podStatusChannel pod := syncRequest.pod status := syncRequest.status @@ -182,7 +205,7 @@ func (s *statusManager) syncBatch() error { ObjectMeta: pod.ObjectMeta, } // TODO: make me easier to express from client code - statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) + statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) if errors.IsNotFound(err) { glog.V(3).Infof("Pod %q was deleted on the server", pod.Name) return nil @@ -194,7 +217,7 @@ func (s *statusManager) syncBatch() error { } statusPod.Status = status // TODO: handle conflict as a retry, make that easier too. - statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) + statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) @@ -205,9 +228,9 @@ func (s *statusManager) syncBatch() error { glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name) return nil } - if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { + if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) - s.DeletePodStatus(pod.UID) + m.DeletePodStatus(pod.UID) return nil } } @@ -220,7 +243,7 @@ func (s *statusManager) syncBatch() error { // is full, and the pod worker holding the lock is waiting on this method // to clear the channel. Even if this delete never runs subsequent container // changes on the node should trigger updates. - go s.DeletePodStatus(pod.UID) + go m.DeletePodStatus(pod.UID) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status/manager_test.go similarity index 92% rename from pkg/kubelet/status_manager_test.go rename to pkg/kubelet/status/manager_test.go index e04f24b80f0..2a8f7e5a8cd 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kubelet +package status import ( "fmt" @@ -37,8 +37,8 @@ var testPod *api.Pod = &api.Pod{ }, } -func newTestStatusManager() *statusManager { - return newStatusManager(&testclient.Fake{}) +func newTestManager() *manager { + return NewManager(&testclient.Fake{}).(*manager) } func generateRandomMessage() string { @@ -66,7 +66,7 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions [] } } -func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { +func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { // Consume all updates in the channel. numUpdates := 0 for { @@ -89,7 +89,7 @@ func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { } func TestNewStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 1) @@ -100,7 +100,7 @@ func TestNewStatus(t *testing.T) { } func TestNewStatusPreservesPodStartTime(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -121,14 +121,14 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) { } func TestChangedStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus()) verifyUpdates(t, syncer, 2) } func TestChangedStatusKeepsStartTime(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() now := util.Now() firstStatus := getRandomPodStatus() firstStatus.StartTime = &now @@ -145,7 +145,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { } func TestUnchangedStatus(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() podStatus := getRandomPodStatus() syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus) @@ -153,7 +153,7 @@ func TestUnchangedStatus(t *testing.T) { } func TestSyncBatchIgnoresNotFound(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() if err != nil { @@ -165,7 +165,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) { } func TestSyncBatch(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() @@ -180,7 +180,7 @@ func TestSyncBatch(t *testing.T) { } func TestSyncBatchChecksMismatchedUID(t *testing.T) { - syncer := newTestStatusManager() + syncer := newTestManager() testPod.UID = "first" differentPod := *testPod differentPod.UID = "second"