diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index ae71b367d92..ef153d88731 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -67,6 +67,10 @@ const ( // Realistic value of the burstReplica field for the replication manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 + + // We must avoid counting pods until the pod store has synced. If it hasn't synced, to + // avoid a hot loop, we'll wait this long between checks. + PodStoreSyncedPollPeriod = 100 * time.Millisecond ) // ReplicationManager is responsible for synchronizing ReplicationController objects stored @@ -80,6 +84,11 @@ type ReplicationManager struct { burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error + + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool + // A TTLCache of pod creates/deletes each rc expects to see expectations RCExpectationsManager // A store of controllers, populated by the rcController @@ -167,6 +176,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl ) rm.syncHandler = rm.syncReplicationController + rm.podStoreSynced = rm.podController.HasSynced return rm } @@ -366,6 +376,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { return err } controller := *obj.(*api.ReplicationController) + if !rm.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(PodStoreSyncedPollPeriod) + glog.Infof("Waiting for pods controller to sync, requeuing rc %v", controller.Name) + rm.enqueueController(&controller) + return nil + } // Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 88ec1c953b7..a374e1c80d9 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -54,6 +54,8 @@ type FakePodControl struct { // the watch test will take upto 1/2 a second before timing out. const controllerTimeout = 500 * time.Millisecond +var alwaysReady = func() bool { return true } + func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } @@ -223,6 +225,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) @@ -238,6 +241,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -253,6 +257,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl received := make(chan string) @@ -284,6 +289,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected controller := newReplicationController(2) @@ -349,6 +355,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 @@ -390,6 +397,7 @@ func TestControllerUpdateReplicas(t *testing.T) { client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -579,6 +587,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { fakePodControl := FakePodControl{} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -618,6 +627,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { func TestPodControllerLookup(t *testing.T) { manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}), BurstReplicas) + manager.podStoreSynced = alwaysReady testCases := []struct { inRCs []*api.ReplicationController pod *api.Pod @@ -684,6 +694,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady var testControllerSpec api.ReplicationController received := make(chan string) @@ -725,6 +736,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) @@ -768,6 +780,7 @@ func TestUpdatePods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady received := make(chan string) @@ -826,6 +839,7 @@ func TestControllerUpdateRequeue(t *testing.T) { client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) manager := NewReplicationManager(client, BurstReplicas) + manager.podStoreSynced = alwaysReady rc := newReplicationController(1) manager.controllerStore.Store.Add(rc) @@ -902,6 +916,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} manager := NewReplicationManager(client, burstReplicas) + manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(numReplicas) @@ -1016,6 +1031,7 @@ func TestRCSyncExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} manager := NewReplicationManager(client, 2) + manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -1039,6 +1055,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) manager := NewReplicationManager(client, 10) + manager.podStoreSynced = alwaysReady rc := newReplicationController(1) manager.controllerStore.Store.Add(rc) @@ -1070,3 +1087,29 @@ func TestDeleteControllerAndExpectations(t *testing.T) { manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } + +func TestRCManagerNotReady(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) + fakePodControl := FakePodControl{} + manager := NewReplicationManager(client, 2) + manager.podControl = &fakePodControl + manager.podStoreSynced = func() bool { return false } + + // Simulates the rc reflector running before the pod reflector. We don't + // want to end up creating replicas in this case until the pod reflector + // has synced, so the rc manager should just requeue the rc. + controllerSpec := newReplicationController(1) + manager.controllerStore.Store.Add(controllerSpec) + + rcKey := getKey(controllerSpec, t) + manager.syncReplicationController(rcKey) + validateSyncReplication(t, &fakePodControl, 0, 0) + queueRC, _ := manager.queue.Get() + if queueRC != rcKey { + t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) + } + + manager.podStoreSynced = alwaysReady + manager.syncReplicationController(rcKey) + validateSyncReplication(t, &fakePodControl, 1, 0) +}