Reduce testing time of status_manager_test.

Adds tests for syncBatch().
This commit is contained in:
Victor Marmol 2015-03-25 09:32:16 -07:00
parent 5eb373692b
commit 65070d6d81
2 changed files with 60 additions and 33 deletions

View File

@ -17,9 +17,9 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"fmt"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -52,8 +52,13 @@ func newStatusManager(kubeClient client.Interface) *statusManager {
} }
func (s *statusManager) Start() { func (s *statusManager) Start() {
// We can run SyncBatch() often because it will block until we have some updates to send. // syncBatch blocks when no updates are available, we can run it in a tight loop.
go util.Forever(s.SyncBatch, 0) go util.Forever(func() {
err := s.syncBatch()
if err != nil {
glog.Warningf("Failed to updated pod status: %v", err)
}
}, 0)
} }
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) { func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
@ -94,28 +99,23 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
} }
} }
// SyncBatch syncs pods statuses with the apiserver. It will loop until channel // syncBatch syncs pods statuses with the apiserver.
// s.podStatusChannel is empty for at least 1s. func (s *statusManager) syncBatch() error {
func (s *statusManager) SyncBatch() { syncRequest := <-s.podStatusChannel
for { pod := syncRequest.pod
select { podFullName := kubecontainer.GetPodFullName(pod)
case syncRequest := <-s.podStatusChannel: status := syncRequest.status
pod := syncRequest.pod
podFullName := kubecontainer.GetPodFullName(pod) _, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
status := syncRequest.status if err != nil {
glog.V(3).Infof("Syncing status for %s", podFullName) // We failed to update status. In order to make sure we retry next time
_, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) // we delete cached value. This may result in an additional update, but
if err != nil { // this is ok.
// We failed to update status. In order to make sure we retry next time s.DeletePodStatus(podFullName)
// we delete cached value. This may result in an additional update, but return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err)
// this is ok. } else {
s.DeletePodStatus(podFullName) glog.V(3).Infof("Status for pod %q updated successfully", pod.Name)
glog.Warningf("Error updating status for pod %q: %v", podFullName, err)
} else {
glog.V(3).Infof("Status for pod %q updated successfully", podFullName)
}
case <-time.After(1 * time.Second):
return
}
} }
return nil
} }

View File

@ -60,28 +60,55 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []
} }
} }
func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) {
// Consume all updates in the channel.
numUpdates := 0
for {
hasUpdate := true
select {
case <-manager.podStatusChannel:
numUpdates++
default:
hasUpdate = false
}
if !hasUpdate {
break
}
}
if numUpdates != expectedUpdates {
t.Errorf("unexpected number of updates %d, expected %s", numUpdates, expectedUpdates)
}
}
func TestNewStatus(t *testing.T) { func TestNewStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SyncBatch() verifyUpdates(t, syncer, 1)
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
} }
func TestChangedStatus(t *testing.T) { func TestChangedStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SyncBatch()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SyncBatch() verifyUpdates(t, syncer, 2)
verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"})
} }
func TestUnchangedStatus(t *testing.T) { func TestUnchangedStatus(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestStatusManager()
podStatus := getRandomPodStatus() podStatus := getRandomPodStatus()
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.SyncBatch()
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.SyncBatch() verifyUpdates(t, syncer, 1)
}
func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"}) verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
} }