Avoid syncing rcs till the pods controller has synced.

This commit is contained in:
Prashanth Balasubramanian 2015-06-19 13:35:19 -07:00
parent 29ffee51a8
commit 35950c04ff
2 changed files with 60 additions and 0 deletions

View File

@ -67,6 +67,10 @@ const (
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
@ -80,6 +84,11 @@ type ReplicationManager struct {
burstReplicas int
// To allow injection of syncReplicationController for testing.
syncHandler func(rcKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// A TTLCache of pod creates/deletes each rc expects to see
expectations RCExpectationsManager
// A store of controllers, populated by the rcController
@ -167,6 +176,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl
)
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
return rm
}
@ -366,6 +376,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
return err
}
controller := *obj.(*api.ReplicationController)
if !rm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", controller.Name)
rm.enqueueController(&controller)
return nil
}
// Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters

View File

@ -54,6 +54,8 @@ type FakePodControl struct {
// the watch test will take upto 1/2 a second before timing out.
const controllerTimeout = 500 * time.Millisecond
var alwaysReady = func() bool { return true }
func init() {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
}
@ -223,6 +225,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
controllerSpec := newReplicationController(2)
@ -238,6 +241,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
// 2 running pods and a controller with 1 replica, one pod delete expected
@ -253,6 +257,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
received := make(chan string)
@ -284,6 +289,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
controller := newReplicationController(2)
@ -349,6 +355,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
activePods := 5
@ -390,6 +397,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
@ -579,6 +587,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
@ -618,6 +627,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}), BurstReplicas)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
pod *api.Pod
@ -684,6 +694,7 @@ func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
received := make(chan string)
@ -725,6 +736,7 @@ func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
testControllerSpec := newReplicationController(1)
@ -768,6 +780,7 @@ func TestUpdatePods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{Watch: fakeWatch}
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@ -826,6 +839,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.controllerStore.Store.Add(rc)
@ -902,6 +916,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, burstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(numReplicas)
@ -1016,6 +1031,7 @@ func TestRCSyncExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, 2)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
@ -1039,6 +1055,7 @@ func TestRCSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, 10)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.controllerStore.Store.Add(rc)
@ -1070,3 +1087,29 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestRCManagerNotReady(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, 2)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
// Simulates the rc reflector running before the pod reflector. We don't
// want to end up creating replicas in this case until the pod reflector
// has synced, so the rc manager should just requeue the rc.
controllerSpec := newReplicationController(1)
manager.controllerStore.Store.Add(controllerSpec)
rcKey := getKey(controllerSpec, t)
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 0, 0)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
manager.podStoreSynced = alwaysReady
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 1, 0)
}