diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index d7b8f802838..665f08578ba 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -554,9 +554,6 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { - // Something went wrong with adoption or release. - // Requeue and try again so we don't leave orphans sitting around. - rsc.queue.Add(key) return err } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 80e3e672288..ae79874dd97 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -189,6 +189,24 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[s } } +// processSync initiates a sync via processNextWorkItem() to test behavior that +// depends on both functions (such as re-queueing on sync error). +func processSync(rsc *ReplicaSetController, key string) error { + // Save old syncHandler and replace with one that captures the error. + oldSyncHandler := rsc.syncHandler + defer func() { + rsc.syncHandler = oldSyncHandler + }() + var syncErr error + rsc.syncHandler = func(key string) error { + syncErr = oldSyncHandler(key) + return syncErr + } + rsc.queue.Add(key) + rsc.processNextWorkItem() + return syncErr +} + func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { if e, a := expectedCreates, len(fakePodControl.Templates); e != a { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) @@ -1303,7 +1321,7 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") rsKey := getKey(rs, t) - err := manager.syncReplicaSet(rsKey) + err := processSync(manager, rsKey) if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Errorf("expected Fake Error, got %+v", err) } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 5c23a0b0974..df34de1c63f 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -386,29 +386,27 @@ func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { - workFunc := func() bool { - key, quit := rm.queue.Get() - if quit { - return true - } - defer rm.queue.Done(key) + for rm.processNextWorkItem() { + } + glog.Infof("replication controller worker shutting down") +} - err := rm.syncHandler(key.(string)) - if err == nil { - rm.queue.Forget(key) - return false - } - - rm.queue.AddRateLimited(key) - utilruntime.HandleError(err) +func (rm *ReplicationManager) processNextWorkItem() bool { + key, quit := rm.queue.Get() + if quit { return false } - for { - if quit := workFunc(); quit { - glog.Infof("replication controller worker shutting down") - return - } + defer rm.queue.Done(key) + + err := rm.syncHandler(key.(string)) + if err == nil { + rm.queue.Forget(key) + return true } + + rm.queue.AddRateLimited(key) + utilruntime.HandleError(err) + return true } // manageReplicas checks and updates replicas for the given replication controller. @@ -569,16 +567,11 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { // anymore but has the stale controller ref. pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err)) - rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { - // Something went wrong with adoption or release. - // Requeue and try again so we don't leave orphans sitting around. - rm.queue.Add(key) return err } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index dda9435f9db..23ce15afa90 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -146,6 +146,24 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, rc *v1.Replica } } +// processSync initiates a sync via processNextWorkItem() to test behavior that +// depends on both functions (such as re-queueing on sync error). +func processSync(rm *ReplicationManager, key string) error { + // Save old syncHandler and replace with one that captures the error. + oldSyncHandler := rm.syncHandler + defer func() { + rm.syncHandler = oldSyncHandler + }() + var syncErr error + rm.syncHandler = func(key string) error { + syncErr = oldSyncHandler(key) + return syncErr + } + rm.queue.Add(key) + rm.processNextWorkItem() + return syncErr +} + func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { if e, a := expectedCreates, len(fakePodControl.Templates); e != a { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) @@ -1280,7 +1298,7 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") rcKey := getKey(rc, t) - err := manager.syncReplicationController(rcKey) + err := processSync(manager, rcKey) if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Fatalf("expected Fake Error, got %v", err) }