diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index f3d16106a8a..3065a79ec14 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -17,9 +17,9 @@ limitations under the License. package kubelet import ( + "fmt" "reflect" "sync" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -52,8 +52,13 @@ func newStatusManager(kubeClient client.Interface) *statusManager { } func (s *statusManager) Start() { - // We can run SyncBatch() often because it will block until we have some updates to send. - go util.Forever(s.SyncBatch, 0) + // syncBatch blocks when no updates are available, we can run it in a tight loop. + go util.Forever(func() { + err := s.syncBatch() + if err != nil { + glog.Warningf("Failed to updated pod status: %v", err) + } + }, 0) } func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { @@ -94,28 +99,23 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { } } -// SyncBatch syncs pods statuses with the apiserver. It will loop until channel -// s.podStatusChannel is empty for at least 1s. -func (s *statusManager) SyncBatch() { - for { - select { - case syncRequest := <-s.podStatusChannel: - pod := syncRequest.pod - podFullName := kubecontainer.GetPodFullName(pod) - status := syncRequest.status - glog.V(3).Infof("Syncing status for %s", podFullName) - _, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) - if err != nil { - // We failed to update status. In order to make sure we retry next time - // we delete cached value. This may result in an additional update, but - // this is ok. - s.DeletePodStatus(podFullName) - glog.Warningf("Error updating status for pod %q: %v", podFullName, err) - } else { - glog.V(3).Infof("Status for pod %q updated successfully", podFullName) - } - case <-time.After(1 * time.Second): - return - } +// syncBatch syncs pods statuses with the apiserver. +func (s *statusManager) syncBatch() error { + syncRequest := <-s.podStatusChannel + pod := syncRequest.pod + podFullName := kubecontainer.GetPodFullName(pod) + status := syncRequest.status + + _, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) + if err != nil { + // We failed to update status. In order to make sure we retry next time + // we delete cached value. This may result in an additional update, but + // this is ok. + s.DeletePodStatus(podFullName) + return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err) + } else { + glog.V(3).Infof("Status for pod %q updated successfully", pod.Name) } + + return nil } diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 32977e663db..ced98525bb4 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -60,28 +60,55 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions [] } } +func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) { + // Consume all updates in the channel. + numUpdates := 0 + for { + hasUpdate := true + select { + case <-manager.podStatusChannel: + numUpdates++ + default: + hasUpdate = false + } + + if !hasUpdate { + break + } + } + + if numUpdates != expectedUpdates { + t.Errorf("unexpected number of updates %d, expected %s", numUpdates, expectedUpdates) + } +} + func TestNewStatus(t *testing.T) { syncer := newTestStatusManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.SyncBatch() - verifyActions(t, syncer.kubeClient, []string{"update-status-pod"}) + verifyUpdates(t, syncer, 1) } func TestChangedStatus(t *testing.T) { syncer := newTestStatusManager() syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.SyncBatch() syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.SyncBatch() - verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"}) + verifyUpdates(t, syncer, 2) } func TestUnchangedStatus(t *testing.T) { syncer := newTestStatusManager() podStatus := getRandomPodStatus() syncer.SetPodStatus(testPod, podStatus) - syncer.SyncBatch() syncer.SetPodStatus(testPod, podStatus) - syncer.SyncBatch() + verifyUpdates(t, syncer, 1) +} + +func TestSyncBatch(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(testPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } verifyActions(t, syncer.kubeClient, []string{"update-status-pod"}) }