diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index ecb1e9a34df..b813ec7f867 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -112,7 +112,7 @@ type ReplicationManager struct { lookupCache *controller.MatchingCache // Controllers that need to be synced - queue *workqueue.Type + queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. @@ -143,7 +143,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), - queue: workqueue.New(), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), garbageCollectorEnabled: garbageCollectorEnabled, } @@ -464,10 +464,15 @@ func (rm *ReplicationManager) worker() { return true } defer rm.queue.Done(key) + err := rm.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing replication controller: %v", err) + if err == nil { + rm.queue.Forget(key) + return false } + + rm.queue.AddRateLimited(key) + utilruntime.HandleError(err) return false } for { @@ -480,13 +485,16 @@ func (rm *ReplicationManager) worker() { // manageReplicas checks and updates replicas for the given replication controller. // Does NOT modify . -func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) { +func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) error { diff := len(filteredPods) - int(rc.Spec.Replicas) rcKey, err := controller.KeyFunc(rc) if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) - return + return err } + if diff == 0 { + return nil + } + if diff < 0 { diff *= -1 if diff > rm.burstReplicas { @@ -497,6 +505,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. + errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup wg.Add(diff) @@ -522,55 +531,79 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) - rm.enqueueController(rc) + errCh <- err utilruntime.HandleError(err) } }() } wg.Wait() - } else if diff > 0 { - if diff > rm.burstReplicas { - diff = rm.burstReplicas + + select { + case err := <-errCh: + // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. + if err != nil { + return err + } + default: } - glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) - // No need to sort pods if we are about to delete all of them - if rc.Spec.Replicas != 0 { - // Sort the pods in the order such that not-ready < ready, unscheduled - // < scheduled, and pending < running. This ensures that we delete pods - // in the earlier stages whenever possible. - sort.Sort(controller.ActivePods(filteredPods)) - } - // Snapshot the UIDs (ns/name) of the pods we're expecting to see - // deleted, so we know to record their expectations exactly once either - // when we see it as an update of the deletion timestamp, or as a delete. - // Note that if the labels on a pod/rc change in a way that the pod gets - // orphaned, the rs will only wake up after the expectations have - // expired even if other pods are deleted. - deletedPodKeys := []string{} - for i := 0; i < diff; i++ { - deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) - } - // We use pod namespace/name as a UID to wait for deletions, so if the - // labels on a pod/rc change in a way that the pod gets orphaned, the - // rc will only wake up after the expectation has expired. - rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) - var wg sync.WaitGroup - wg.Add(diff) - for i := 0; i < diff; i++ { - go func(ix int) { - defer wg.Done() - if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { - // Decrement the expected number of deletes because the informer won't observe this deletion - podKey := controller.PodKey(filteredPods[ix]) - glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) - rm.expectations.DeletionObserved(rcKey, podKey) - rm.enqueueController(rc) - utilruntime.HandleError(err) - } - }(i) - } - wg.Wait() + + return nil } + + if diff > rm.burstReplicas { + diff = rm.burstReplicas + } + glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) + // No need to sort pods if we are about to delete all of them + if rc.Spec.Replicas != 0 { + // Sort the pods in the order such that not-ready < ready, unscheduled + // < scheduled, and pending < running. This ensures that we delete pods + // in the earlier stages whenever possible. + sort.Sort(controller.ActivePods(filteredPods)) + } + // Snapshot the UIDs (ns/name) of the pods we're expecting to see + // deleted, so we know to record their expectations exactly once either + // when we see it as an update of the deletion timestamp, or as a delete. + // Note that if the labels on a pod/rc change in a way that the pod gets + // orphaned, the rs will only wake up after the expectations have + // expired even if other pods are deleted. + deletedPodKeys := []string{} + for i := 0; i < diff; i++ { + deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) + } + // We use pod namespace/name as a UID to wait for deletions, so if the + // labels on a pod/rc change in a way that the pod gets orphaned, the + // rc will only wake up after the expectation has expired. + errCh := make(chan error, diff) + rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) + var wg sync.WaitGroup + wg.Add(diff) + for i := 0; i < diff; i++ { + go func(ix int) { + defer wg.Done() + if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { + // Decrement the expected number of deletes because the informer won't observe this deletion + podKey := controller.PodKey(filteredPods[ix]) + glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) + rm.expectations.DeletionObserved(rcKey, podKey) + errCh <- err + utilruntime.HandleError(err) + } + }(i) + } + wg.Wait() + + select { + case err := <-errCh: + // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. + if err != nil { + return err + } + default: + } + + return nil + } // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning @@ -600,8 +633,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { return nil } if err != nil { - glog.Infof("Unable to retrieve rc %v from store: %v", key, err) - rm.queue.Add(key) return err } rc := *obj.(*api.ReplicationController) @@ -672,8 +703,9 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { filteredPods = controller.FilterActivePods(pods) } + var manageReplicasErr error if rcNeedsSync && rc.DeletionTimestamp == nil { - rm.manageReplicas(filteredPods, &rc) + manageReplicasErr = rm.manageReplicas(filteredPods, &rc) } trace.Step("manageReplicas done") @@ -692,10 +724,9 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { // Always updates status as pods come up or die. if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount); err != nil { - // Multiple things could lead to this update failing. Requeuing the controller ensures - // we retry with some fairness. - glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rc.Namespace, rc.Name, err) - rm.enqueueController(&rc) + // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop + return err } - return nil + + return manageReplicasErr } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 3824a2370b1..03e866b3868 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" @@ -606,23 +607,11 @@ func TestControllerUpdateRequeue(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl - manager.syncReplicationController(getKey(rc, t)) - - ch := make(chan interface{}) - go func() { - item, _ := manager.queue.Get() - ch <- item - }() - select { - case key := <-ch: - expectedKey := getKey(rc, t) - if key != expectedKey { - t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key) - } - case <-time.After(wait.ForeverTestTimeout): - manager.queue.ShutDown() - t.Errorf("Expected to find an rc in the queue, found none.") + // an error from the sync function will be requeued, check to make sure we returned an error + if err := manager.syncReplicationController(getKey(rc, t)); err == nil { + t.Errorf("missing error for requeue") } + // 1 Update and 1 GET, both of which fail fakeHandler.ValidateRequestCount(t, 2) } @@ -1136,8 +1125,8 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { } // setupManagerWithGCEnabled creates a RC manager with a fakePodControl and with garbageCollectorEnabled set to true -func setupManagerWithGCEnabled() (manager *ReplicationManager, fakePodControl *controller.FakePodControl) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) +func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl) { + c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.garbageCollectorEnabled = true @@ -1165,8 +1154,8 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { } func TestPatchPodWithOtherOwnerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl := setupManagerWithGCEnabled(rc) manager.rcStore.Indexer.Add(rc) // add to podStore one more matching pod that doesn't have a controller // ref, but has an owner ref pointing to other object. Expect a patch to @@ -1185,8 +1174,8 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { } func TestPatchPodWithCorrectOwnerRef(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl := setupManagerWithGCEnabled(rc) manager.rcStore.Indexer.Add(rc) // add to podStore a matching pod that has an ownerRef pointing to the rc, // but ownerRef.Controller is false. Expect a patch to take control it. @@ -1204,8 +1193,8 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { } func TestPatchPodFails(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl := setupManagerWithGCEnabled(rc) manager.rcStore.Indexer.Add(rc) // add to podStore two matching pods. Expect two patches to take control // them. @@ -1215,16 +1204,16 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and create new ones. fakePodControl.Err = fmt.Errorf("Fake Error") err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) + if err == nil || err.Error() != "Fake Error" { + t.Fatalf("expected Fake Error, got %v", err) } // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. validateSyncReplication(t, fakePodControl, 2, 0, 2) } func TestPatchExtraPodsThenDelete(t *testing.T) { - manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) + manager, fakePodControl := setupManagerWithGCEnabled(rc) manager.rcStore.Indexer.Add(rc) // add to podStore three matching pods. Expect three patches to take control // them, and later delete one of them.