From 85de930a66feb85cd627d5808b2affcbac52f64a Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 19 Jul 2016 11:46:11 +0200 Subject: [PATCH] ReplicaSet controller can set/remove ControllerRef --- .../app/controllermanager.go | 2 +- pkg/controller/replicaset/replica_set.go | 202 ++++++++---- pkg/controller/replicaset/replica_set_test.go | 295 +++++++++++++++--- .../replication/replication_controller.go | 8 +- 4 files changed, 395 insertions(+), 112 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 95653e48455..4ba76402c76 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -366,7 +366,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). + go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index b879cca648b..89c50d8c2d1 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -27,8 +27,10 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" @@ -38,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -63,6 +66,10 @@ const ( statusUpdateRetries = 1 ) +func getRSKind() unversioned.GroupVersionKind { + return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") +} + // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. type ReplicaSetController struct { @@ -101,21 +108,25 @@ type ReplicaSetController struct { // Controllers that need to be synced queue *workqueue.Type + + // garbageCollectorEnabled denotes if the garbage collector is enabled. RC + // manager behaves differently if GC is enabled. + garbageCollectorEnabled bool } // NewReplicaSetController creates a new ReplicaSetController. -func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicaSetController( eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), - podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) } // newReplicaSetController configures a replica set controller with the specified event recorder -func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } @@ -129,6 +140,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.New(), + garbageCollectorEnabled: garbageCollectorEnabled, } rsc.rsStore.Store, rsc.rsController = framework.NewInformer( @@ -144,43 +156,8 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // TODO: Can we have much longer period here? FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: rsc.enqueueReplicaSet, - UpdateFunc: func(old, cur interface{}) { - oldRS := old.(*extensions.ReplicaSet) - curRS := cur.(*extensions.ReplicaSet) - - // We should invalidate the whole lookup cache if a RS's selector has been updated. - // - // Imagine that you have two RSs: - // * old RS1 - // * new RS2 - // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). - // Now imagine that you are changing RS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by RS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { - rsc.lookupCache.InvalidateAll() - } - - // You might imagine that we only really need to enqueue the - // replica set when Spec changes, but it is safer to sync any - // time this function is triggered. That way a full informer - // resync can requeue any replica set that don't yet have pods - // but whose last attempts at creating a pod have failed (since - // we don't block on creation of pods) instead of those - // replica sets stalling indefinitely. Enqueueing every time - // does result in some spurious syncs (like when Status.Replica - // is updated and the watch notification from it retriggers - // this function), but in general extra resyncs shouldn't be - // that bad as ReplicaSets that haven't met expectations yet won't - // sync, and all the listing is done using local stores. - if oldRS.Status.Replicas != curRS.Status.Replicas { - glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) - } - rsc.enqueueReplicaSet(cur) - }, + AddFunc: rsc.enqueueReplicaSet, + UpdateFunc: rsc.updateRS, // This will enter the sync loop and no-op, because the replica set has been deleted from the store. // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the replica set. @@ -208,7 +185,8 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) - rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + garbageCollectorEnabled := false + rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rsc.internalPodInformer = podInformer return rsc } @@ -239,13 +217,14 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { // getPodReplicaSet returns the replica set managing the given pod. // TODO: Surface that we are ignoring multiple replica sets for a single pod. +// TODO: use ownerReference.Controller to determine if the rs controls the pod. func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet { // look up in the cache, if cached and the cache is valid, just return cached value if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached { rs, ok := obj.(*extensions.ReplicaSet) if !ok { // This should not happen - glog.Errorf("lookup cache does not retuen a ReplicaSet object") + glog.Errorf("lookup cache does not return a ReplicaSet object") return nil } if cached && rsc.isCacheValid(pod, rs) { @@ -278,6 +257,44 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl return &rss[0] } +// callback when RS is updated +func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { + oldRS := old.(*extensions.ReplicaSet) + curRS := cur.(*extensions.ReplicaSet) + + // We should invalidate the whole lookup cache if a RS's selector has been updated. + // + // Imagine that you have two RSs: + // * old RS1 + // * new RS2 + // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). + // Now imagine that you are changing RS1 selector so that it is now matching that pod, + // in such case we must invalidate the whole cache so that pod could be adopted by RS1 + // + // This makes the lookup cache less helpful, but selector update does not happen often, + // so it's not a big problem + if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { + rsc.lookupCache.InvalidateAll() + } + + // You might imagine that we only really need to enqueue the + // replica set when Spec changes, but it is safer to sync any + // time this function is triggered. That way a full informer + // resync can requeue any replica set that don't yet have pods + // but whose last attempts at creating a pod have failed (since + // we don't block on creation of pods) instead of those + // replica sets stalling indefinitely. Enqueueing every time + // does result in some spurious syncs (like when Status.Replica + // is updated and the watch notification from it retriggers + // this function), but in general extra resyncs shouldn't be + // that bad as ReplicaSets that haven't met expectations yet won't + // sync, and all the listing is done using local stores. + if oldRS.Status.Replicas != curRS.Status.Replicas { + glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) + } + rsc.enqueueReplicaSet(cur) +} + // isCacheValid check if the cache is valid func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { _, exists, err := rsc.rsStore.Get(cachedRS) @@ -357,9 +374,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } - if rs := rsc.getPodReplicaSet(curPod); rs != nil { - rsc.enqueueReplicaSet(rs) - } + // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. if labelChanged { // If the old and new ReplicaSet are the same, the first one that syncs // will set expectations preventing any damage from the second. @@ -367,6 +382,10 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { rsc.enqueueReplicaSet(oldRS) } } + + if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { + rsc.enqueueReplicaSet(curRS) + } } // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations. @@ -456,13 +475,28 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) - wait := sync.WaitGroup{} - wait.Add(diff) + var wg sync.WaitGroup + wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) for i := 0; i < diff; i++ { go func() { - defer wait.Done() - if err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs); err != nil { + defer wg.Done() + var err error + + if rsc.garbageCollectorEnabled { + var trueVar = true + controllerRef := &api.OwnerReference{ + APIVersion: getRSKind().GroupVersion().String(), + Kind: getRSKind().Kind, + Name: rs.Name, + UID: rs.UID, + Controller: &trueVar, + } + err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) + } else { + err = rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs) + } + if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.CreationObserved(rsKey) @@ -470,7 +504,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext } }() } - wait.Wait() + wg.Wait() } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas @@ -494,11 +528,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys) - wait := sync.WaitGroup{} - wait.Add(diff) + var wg sync.WaitGroup + wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { - defer wait.Done() + defer wg.Done() if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) @@ -508,7 +542,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext } }(i) } - wait.Wait() + wg.Wait() } } @@ -557,16 +591,60 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { glog.Errorf("Error converting pod selector to selector: %v", err) return err } - podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) - if err != nil { - glog.Errorf("Error getting pods for ReplicaSet %q: %v", key, err) - rsc.queue.Add(key) - return err + + // TODO: Do the List and Filter in a single pass, or use an index. + var filteredPods []*api.Pod + if rsc.garbageCollectorEnabled { + // list all pods to include the pods that don't match the rs`s selector + // anymore but has the stale controller ref. + podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) + if err != nil { + glog.Errorf("Error getting pods for rs %q: %v", key, err) + rsc.queue.Add(key) + return err + } + cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) + matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items) + for _, pod := range matchesNeedsController { + err := cm.AdoptPod(pod) + // continue to next pod if adoption fails. + if err != nil { + // If the pod no longer exists, don't even log the error. + if !errors.IsNotFound(err) { + utilruntime.HandleError(err) + } + } else { + matchesAndControlled = append(matchesAndControlled, pod) + } + } + filteredPods = matchesAndControlled + // remove the controllerRef for the pods that no longer have matching labels + var errlist []error + for _, pod := range controlledDoesNotMatch { + err := cm.ReleasePod(pod) + if err != nil { + errlist = append(errlist, err) + } + } + if len(errlist) != 0 { + aggregate := utilerrors.NewAggregate(errlist) + // push the RS into work queue again. We need to try to free the + // pods again otherwise they will stuck with the stale + // controllerRef. + rsc.queue.Add(key) + return aggregate + } + } else { + podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) + if err != nil { + glog.Errorf("Error getting pods for rs %q: %v", key, err) + rsc.queue.Add(key) + return err + } + filteredPods = controller.FilterActivePods(podList.Items) } - // TODO: Do this in a single pass, or use an index. - filteredPods := controller.FilterActivePods(podList.Items) - if rsNeedsSync { + if rsNeedsSync && rs.DeletionTimestamp == nil { rsc.manageReplicas(filteredPods, &rs) } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index be1bdb1f635..1858b4aabf9 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -96,34 +96,46 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl return rs } +// create a pod with the given phase for the given rs (same selectors and namespace) +func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: rs.Namespace, + Labels: rs.Spec.Selector.MatchLabels, + }, + Status: api.PodStatus{Phase: status}, + } +} + // create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store. func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList { pods := []api.Pod{} + var trueVar = true + controllerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} for i := 0; i < count; i++ { - newPod := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s%d", name, i), - Labels: labelMap, - Namespace: rs.Namespace, - }, - Status: api.PodStatus{Phase: status}, - } + pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status) + pod.ObjectMeta.Labels = labelMap + pod.OwnerReferences = []api.OwnerReference{controllerReference} if store != nil { - store.Add(&newPod) + store.Add(pod) } - pods = append(pods, newPod) + pods = append(pods, *pod) } return &api.PodList{ Items: pods, } } -func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) { - if len(fakePodControl.Templates) != expectedCreates { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates)) +func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { + if e, a := expectedCreates, len(fakePodControl.Templates); e != a { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) } - if len(fakePodControl.DeletePodName) != expectedDeletes { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) + if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) + } + if e, a := expectedPatches, len(fakePodControl.Patches); e != a { + t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) } } @@ -150,7 +162,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestSyncReplicaSetDeletes(t *testing.T) { @@ -167,7 +179,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 1) + validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0) } func TestDeleteFinalStateUnknown(t *testing.T) { @@ -217,7 +229,7 @@ func TestSyncReplicaSetCreates(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 2, 0) + validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0) } func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { @@ -244,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) if fakeHandler.RequestReceived != nil { t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state") } @@ -304,7 +316,7 @@ func TestControllerUpdateReplicas(t *testing.T) { decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) } func TestSyncReplicaSetDormancy(t *testing.T) { @@ -330,13 +342,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // Creates a replica and sets expectations rsSpec.Status.Replicas = 1 manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // Expectations prevents replicas but not an update on status rsSpec.Status.Replicas = 0 fakePodControl.Clear() manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) // Get the key for the controller rsKey, err := controller.KeyFunc(rsSpec) @@ -352,13 +364,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl.Err = fmt.Errorf("Fake Error") manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // This replica should not need a Lowering of expectations, since the previous create failed fakePodControl.Clear() fakePodControl.Err = nil manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // 1 PUT for the ReplicaSet status during dormancy window. // Note that the pod creates go through pod control so they're not recorded. @@ -716,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods = int32(burstReplicas) } // This validates the ReplicaSet manager sync actually created pods - validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0) + validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. @@ -727,7 +739,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) podExp, exists, err := manager.expectations.GetExpectations(rsKey) if !exists || err != nil { - t.Fatalf("Did not find expectations for rc.") + t.Fatalf("Did not find expectations for rs.") } if add, _ := podExp.GetExpectations(); add != 1 { t.Fatalf("Expectations are wrong %v", podExp) @@ -737,7 +749,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) if expectedPods > int32(burstReplicas) { expectedPods = int32(burstReplicas) } - validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods)) + validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0) // To accurately simulate a watch we must delete the exact pods // the rs is waiting for. @@ -772,7 +784,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // Check that the ReplicaSet didn't take any action for all the above pods fakePodControl.Clear() manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) // Create/Delete the last pod // The last add pod will decrease the expectation of the ReplicaSet to 0, @@ -851,7 +863,7 @@ func TestRSSyncExpectations(t *testing.T) { }, }) manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestDeleteControllerAndExpectations(t *testing.T) { @@ -867,7 +879,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should set expectations for the ReplicaSet manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) fakePodControl.Clear() // Get the ReplicaSet key @@ -893,7 +905,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { podExp.Add(-1, 0) manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestRSManagerNotReady(t *testing.T) { @@ -911,7 +923,7 @@ func TestRSManagerNotReady(t *testing.T) { rsKey := getKey(rsSpec, t) manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) queueRS, _ := manager.queue.Get() if queueRS != rsKey { t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) @@ -919,7 +931,7 @@ func TestRSManagerNotReady(t *testing.T) { manager.podStoreSynced = alwaysReady manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) } // shuffle returns a new shuffled list of container controllers. @@ -984,9 +996,9 @@ func TestDeletionTimestamp(t *testing.T) { // A pod added with a deletion timestamp should decrement deletions, not creations. manager.addPod(&pod) - queueRC, _ := manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1001,9 +1013,9 @@ func TestDeletionTimestamp(t *testing.T) { manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) manager.updatePod(&oldPod, &pod) - queueRC, _ = manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ = manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1041,9 +1053,9 @@ func TestDeletionTimestamp(t *testing.T) { // Deleting the second pod should clear expectations. manager.deletePod(secondPod) - queueRC, _ = manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ = manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1052,3 +1064,202 @@ func TestDeletionTimestamp(t *testing.T) { t.Fatalf("Wrong expectations %+v", podExp) } } + +// setupManagerWithGCEnabled creates a RS manager with a fakePodControl +// and with garbageCollectorEnabled set to true +func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { + c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl = &controller.FakePodControl{} + manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.garbageCollectorEnabled = true + manager.podStoreSynced = alwaysReady + manager.podControl = fakePodControl + return manager, fakePodControl +} + +func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + var trueVar = true + otherControllerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} + // add to podStore a matching Pod controlled by another controller. Expect no patch. + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{otherControllerReference} + manager.podStore.Indexer.Add(pod) + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // because the matching pod already has a controller, so 2 pods should be created. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 0) +} + +func TestPatchPodWithOtherOwnerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore one more matching pod that doesn't have a controller + // ref, but has an owner ref pointing to other object. Expect a patch to + // take control of it. + unrelatedOwnerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference} + manager.podStore.Indexer.Add(pod) + + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 1 patch to take control of pod, and 1 create of new pod. + validateSyncReplicaSet(t, fakePodControl, 1, 0, 1) +} + +func TestPatchPodWithCorrectOwnerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore a matching pod that has an ownerRef pointing to the rs, + // but ownerRef.Controller is false. Expect a patch to take control it. + rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name} + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} + manager.podStore.Indexer.Add(pod) + + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 1 patch to take control of pod, and 1 create of new pod. + validateSyncReplicaSet(t, fakePodControl, 1, 0, 1) +} + +func TestPatchPodFails(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore two matching pods. Expect two patches to take control + // them. + manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning)) + // let both patches fail. The rs controller will assume it fails to take + // control of the pods and create new ones. + fakePodControl.Err = fmt.Errorf("Fake Error") + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) +} + +func TestPatchExtraPodsThenDelete(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore three matching pods. Expect three patches to take control + // them, and later delete one of them. + manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning)) + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 3 patches to take control of the pods, and 1 deletion because there is an extra pod. + validateSyncReplicaSet(t, fakePodControl, 0, 1, 3) +} + +func TestUpdateLabelsRemoveControllerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // put one pod in the podStore + pod := newPod("pod", rs, api.PodRunning) + var trueVar = true + rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} + pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} + updatedPod := *pod + // reset the labels + updatedPod.Labels = make(map[string]string) + // add the updatedPod to the store. This is consistent with the behavior of + // the Informer: Informer updates the store before call the handler + // (updatePod() in this case). + manager.podStore.Indexer.Add(&updatedPod) + // send a update of the same pod with modified labels + manager.updatePod(pod, &updatedPod) + // verifies that rs is added to the queue + rsKey := getKey(rs, t) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + manager.queue.Done(queueRS) + err := manager.syncReplicaSet(rsKey) + if err != nil { + t.Fatal(err) + } + // expect 1 patch to be sent to remove the controllerRef for the pod. + // expect 2 creates because the rs.Spec.Replicas=2 and there exists no + // matching pod. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 1) + fakePodControl.Clear() +} + +func TestUpdateSelectorControllerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + // put 2 pods in the podStore + newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + // update the RS so that its selector no longer matches the pods + updatedRS := *rs + updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"} + // put the updatedRS into the store. This is consistent with the behavior of + // the Informer: Informer updates the store before call the handler + // (updateRS() in this case). + manager.rsStore.Store.Add(&updatedRS) + manager.updateRS(rs, &updatedRS) + // verifies that the rs is added to the queue + rsKey := getKey(rs, t) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + manager.queue.Done(queueRS) + err := manager.syncReplicaSet(rsKey) + if err != nil { + t.Fatal(err) + } + // expect 2 patches to be sent to remove the controllerRef for the pods. + // expect 2 creates because the rc.Spec.Replicas=2 and there exists no + // matching pod. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) + fakePodControl.Clear() +} + +// RS controller shouldn't adopt or create more pods if the rc is about to be +// deleted. +func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + now := unversioned.Now() + rs.DeletionTimestamp = &now + manager.rsStore.Store.Add(rs) + pod1 := newPod("pod1", rs, api.PodRunning) + manager.podStore.Indexer.Add(pod1) + + // no patch, no create + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index ba5f5b8af5f..bedb2295d50 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -615,12 +615,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey) trace.Step("Expectations restored") - if err != nil { - glog.Errorf("Error getting pods for rc %q: %v", key, err) - rm.queue.Add(key) - return err - } - trace.Step("Pods listed") // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*api.Pod @@ -653,7 +647,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { - errlist = append(errlist, cm.ReleasePod(pod)) + errlist = append(errlist, err) } } if len(errlist) != 0 {