diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 63b973bf68d..971e63a819e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -364,7 +364,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). + go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index b879cca648b..89c50d8c2d1 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -27,8 +27,10 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" @@ -38,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -63,6 +66,10 @@ const ( statusUpdateRetries = 1 ) +func getRSKind() unversioned.GroupVersionKind { + return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") +} + // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. type ReplicaSetController struct { @@ -101,21 +108,25 @@ type ReplicaSetController struct { // Controllers that need to be synced queue *workqueue.Type + + // garbageCollectorEnabled denotes if the garbage collector is enabled. RC + // manager behaves differently if GC is enabled. + garbageCollectorEnabled bool } // NewReplicaSetController creates a new ReplicaSetController. -func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicaSetController( eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), - podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) } // newReplicaSetController configures a replica set controller with the specified event recorder -func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } @@ -129,6 +140,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.New(), + garbageCollectorEnabled: garbageCollectorEnabled, } rsc.rsStore.Store, rsc.rsController = framework.NewInformer( @@ -144,43 +156,8 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // TODO: Can we have much longer period here? FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: rsc.enqueueReplicaSet, - UpdateFunc: func(old, cur interface{}) { - oldRS := old.(*extensions.ReplicaSet) - curRS := cur.(*extensions.ReplicaSet) - - // We should invalidate the whole lookup cache if a RS's selector has been updated. - // - // Imagine that you have two RSs: - // * old RS1 - // * new RS2 - // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). - // Now imagine that you are changing RS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by RS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { - rsc.lookupCache.InvalidateAll() - } - - // You might imagine that we only really need to enqueue the - // replica set when Spec changes, but it is safer to sync any - // time this function is triggered. That way a full informer - // resync can requeue any replica set that don't yet have pods - // but whose last attempts at creating a pod have failed (since - // we don't block on creation of pods) instead of those - // replica sets stalling indefinitely. Enqueueing every time - // does result in some spurious syncs (like when Status.Replica - // is updated and the watch notification from it retriggers - // this function), but in general extra resyncs shouldn't be - // that bad as ReplicaSets that haven't met expectations yet won't - // sync, and all the listing is done using local stores. - if oldRS.Status.Replicas != curRS.Status.Replicas { - glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) - } - rsc.enqueueReplicaSet(cur) - }, + AddFunc: rsc.enqueueReplicaSet, + UpdateFunc: rsc.updateRS, // This will enter the sync loop and no-op, because the replica set has been deleted from the store. // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the replica set. @@ -208,7 +185,8 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) - rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + garbageCollectorEnabled := false + rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rsc.internalPodInformer = podInformer return rsc } @@ -239,13 +217,14 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { // getPodReplicaSet returns the replica set managing the given pod. // TODO: Surface that we are ignoring multiple replica sets for a single pod. +// TODO: use ownerReference.Controller to determine if the rs controls the pod. func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet { // look up in the cache, if cached and the cache is valid, just return cached value if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached { rs, ok := obj.(*extensions.ReplicaSet) if !ok { // This should not happen - glog.Errorf("lookup cache does not retuen a ReplicaSet object") + glog.Errorf("lookup cache does not return a ReplicaSet object") return nil } if cached && rsc.isCacheValid(pod, rs) { @@ -278,6 +257,44 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl return &rss[0] } +// callback when RS is updated +func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { + oldRS := old.(*extensions.ReplicaSet) + curRS := cur.(*extensions.ReplicaSet) + + // We should invalidate the whole lookup cache if a RS's selector has been updated. + // + // Imagine that you have two RSs: + // * old RS1 + // * new RS2 + // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). + // Now imagine that you are changing RS1 selector so that it is now matching that pod, + // in such case we must invalidate the whole cache so that pod could be adopted by RS1 + // + // This makes the lookup cache less helpful, but selector update does not happen often, + // so it's not a big problem + if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { + rsc.lookupCache.InvalidateAll() + } + + // You might imagine that we only really need to enqueue the + // replica set when Spec changes, but it is safer to sync any + // time this function is triggered. That way a full informer + // resync can requeue any replica set that don't yet have pods + // but whose last attempts at creating a pod have failed (since + // we don't block on creation of pods) instead of those + // replica sets stalling indefinitely. Enqueueing every time + // does result in some spurious syncs (like when Status.Replica + // is updated and the watch notification from it retriggers + // this function), but in general extra resyncs shouldn't be + // that bad as ReplicaSets that haven't met expectations yet won't + // sync, and all the listing is done using local stores. + if oldRS.Status.Replicas != curRS.Status.Replicas { + glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) + } + rsc.enqueueReplicaSet(cur) +} + // isCacheValid check if the cache is valid func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { _, exists, err := rsc.rsStore.Get(cachedRS) @@ -357,9 +374,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } - if rs := rsc.getPodReplicaSet(curPod); rs != nil { - rsc.enqueueReplicaSet(rs) - } + // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. if labelChanged { // If the old and new ReplicaSet are the same, the first one that syncs // will set expectations preventing any damage from the second. @@ -367,6 +382,10 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { rsc.enqueueReplicaSet(oldRS) } } + + if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { + rsc.enqueueReplicaSet(curRS) + } } // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations. @@ -456,13 +475,28 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) - wait := sync.WaitGroup{} - wait.Add(diff) + var wg sync.WaitGroup + wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) for i := 0; i < diff; i++ { go func() { - defer wait.Done() - if err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs); err != nil { + defer wg.Done() + var err error + + if rsc.garbageCollectorEnabled { + var trueVar = true + controllerRef := &api.OwnerReference{ + APIVersion: getRSKind().GroupVersion().String(), + Kind: getRSKind().Kind, + Name: rs.Name, + UID: rs.UID, + Controller: &trueVar, + } + err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) + } else { + err = rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs) + } + if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) rsc.expectations.CreationObserved(rsKey) @@ -470,7 +504,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext } }() } - wait.Wait() + wg.Wait() } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas @@ -494,11 +528,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys) - wait := sync.WaitGroup{} - wait.Add(diff) + var wg sync.WaitGroup + wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { - defer wait.Done() + defer wg.Done() if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) @@ -508,7 +542,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext } }(i) } - wait.Wait() + wg.Wait() } } @@ -557,16 +591,60 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { glog.Errorf("Error converting pod selector to selector: %v", err) return err } - podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) - if err != nil { - glog.Errorf("Error getting pods for ReplicaSet %q: %v", key, err) - rsc.queue.Add(key) - return err + + // TODO: Do the List and Filter in a single pass, or use an index. + var filteredPods []*api.Pod + if rsc.garbageCollectorEnabled { + // list all pods to include the pods that don't match the rs`s selector + // anymore but has the stale controller ref. + podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) + if err != nil { + glog.Errorf("Error getting pods for rs %q: %v", key, err) + rsc.queue.Add(key) + return err + } + cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) + matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items) + for _, pod := range matchesNeedsController { + err := cm.AdoptPod(pod) + // continue to next pod if adoption fails. + if err != nil { + // If the pod no longer exists, don't even log the error. + if !errors.IsNotFound(err) { + utilruntime.HandleError(err) + } + } else { + matchesAndControlled = append(matchesAndControlled, pod) + } + } + filteredPods = matchesAndControlled + // remove the controllerRef for the pods that no longer have matching labels + var errlist []error + for _, pod := range controlledDoesNotMatch { + err := cm.ReleasePod(pod) + if err != nil { + errlist = append(errlist, err) + } + } + if len(errlist) != 0 { + aggregate := utilerrors.NewAggregate(errlist) + // push the RS into work queue again. We need to try to free the + // pods again otherwise they will stuck with the stale + // controllerRef. + rsc.queue.Add(key) + return aggregate + } + } else { + podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) + if err != nil { + glog.Errorf("Error getting pods for rs %q: %v", key, err) + rsc.queue.Add(key) + return err + } + filteredPods = controller.FilterActivePods(podList.Items) } - // TODO: Do this in a single pass, or use an index. - filteredPods := controller.FilterActivePods(podList.Items) - if rsNeedsSync { + if rsNeedsSync && rs.DeletionTimestamp == nil { rsc.manageReplicas(filteredPods, &rs) } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index be1bdb1f635..1858b4aabf9 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -96,34 +96,46 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl return rs } +// create a pod with the given phase for the given rs (same selectors and namespace) +func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: rs.Namespace, + Labels: rs.Spec.Selector.MatchLabels, + }, + Status: api.PodStatus{Phase: status}, + } +} + // create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store. func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList { pods := []api.Pod{} + var trueVar = true + controllerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} for i := 0; i < count; i++ { - newPod := api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s%d", name, i), - Labels: labelMap, - Namespace: rs.Namespace, - }, - Status: api.PodStatus{Phase: status}, - } + pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status) + pod.ObjectMeta.Labels = labelMap + pod.OwnerReferences = []api.OwnerReference{controllerReference} if store != nil { - store.Add(&newPod) + store.Add(pod) } - pods = append(pods, newPod) + pods = append(pods, *pod) } return &api.PodList{ Items: pods, } } -func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) { - if len(fakePodControl.Templates) != expectedCreates { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates)) +func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { + if e, a := expectedCreates, len(fakePodControl.Templates); e != a { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) } - if len(fakePodControl.DeletePodName) != expectedDeletes { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) + if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) + } + if e, a := expectedPatches, len(fakePodControl.Patches); e != a { + t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) } } @@ -150,7 +162,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestSyncReplicaSetDeletes(t *testing.T) { @@ -167,7 +179,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 1) + validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0) } func TestDeleteFinalStateUnknown(t *testing.T) { @@ -217,7 +229,7 @@ func TestSyncReplicaSetCreates(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 2, 0) + validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0) } func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { @@ -244,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) if fakeHandler.RequestReceived != nil { t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state") } @@ -304,7 +316,7 @@ func TestControllerUpdateReplicas(t *testing.T) { decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) } func TestSyncReplicaSetDormancy(t *testing.T) { @@ -330,13 +342,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // Creates a replica and sets expectations rsSpec.Status.Replicas = 1 manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // Expectations prevents replicas but not an update on status rsSpec.Status.Replicas = 0 fakePodControl.Clear() manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) // Get the key for the controller rsKey, err := controller.KeyFunc(rsSpec) @@ -352,13 +364,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl.Err = fmt.Errorf("Fake Error") manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // This replica should not need a Lowering of expectations, since the previous create failed fakePodControl.Clear() fakePodControl.Err = nil manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) // 1 PUT for the ReplicaSet status during dormancy window. // Note that the pod creates go through pod control so they're not recorded. @@ -716,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods = int32(burstReplicas) } // This validates the ReplicaSet manager sync actually created pods - validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0) + validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. @@ -727,7 +739,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) podExp, exists, err := manager.expectations.GetExpectations(rsKey) if !exists || err != nil { - t.Fatalf("Did not find expectations for rc.") + t.Fatalf("Did not find expectations for rs.") } if add, _ := podExp.GetExpectations(); add != 1 { t.Fatalf("Expectations are wrong %v", podExp) @@ -737,7 +749,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) if expectedPods > int32(burstReplicas) { expectedPods = int32(burstReplicas) } - validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods)) + validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0) // To accurately simulate a watch we must delete the exact pods // the rs is waiting for. @@ -772,7 +784,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // Check that the ReplicaSet didn't take any action for all the above pods fakePodControl.Clear() manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) // Create/Delete the last pod // The last add pod will decrease the expectation of the ReplicaSet to 0, @@ -851,7 +863,7 @@ func TestRSSyncExpectations(t *testing.T) { }, }) manager.syncReplicaSet(getKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestDeleteControllerAndExpectations(t *testing.T) { @@ -867,7 +879,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should set expectations for the ReplicaSet manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) fakePodControl.Clear() // Get the ReplicaSet key @@ -893,7 +905,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { podExp.Add(-1, 0) manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(getKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } func TestRSManagerNotReady(t *testing.T) { @@ -911,7 +923,7 @@ func TestRSManagerNotReady(t *testing.T) { rsKey := getKey(rsSpec, t) manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 0, 0) + validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) queueRS, _ := manager.queue.Get() if queueRS != rsKey { t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) @@ -919,7 +931,7 @@ func TestRSManagerNotReady(t *testing.T) { manager.podStoreSynced = alwaysReady manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 1, 0) + validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) } // shuffle returns a new shuffled list of container controllers. @@ -984,9 +996,9 @@ func TestDeletionTimestamp(t *testing.T) { // A pod added with a deletion timestamp should decrement deletions, not creations. manager.addPod(&pod) - queueRC, _ := manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1001,9 +1013,9 @@ func TestDeletionTimestamp(t *testing.T) { manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) manager.updatePod(&oldPod, &pod) - queueRC, _ = manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ = manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1041,9 +1053,9 @@ func TestDeletionTimestamp(t *testing.T) { // Deleting the second pod should clear expectations. manager.deletePod(secondPod) - queueRC, _ = manager.queue.Get() - if queueRC != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC) + queueRS, _ = manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } manager.queue.Done(rsKey) @@ -1052,3 +1064,202 @@ func TestDeletionTimestamp(t *testing.T) { t.Fatalf("Wrong expectations %+v", podExp) } } + +// setupManagerWithGCEnabled creates a RS manager with a fakePodControl +// and with garbageCollectorEnabled set to true +func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { + c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl = &controller.FakePodControl{} + manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager.garbageCollectorEnabled = true + manager.podStoreSynced = alwaysReady + manager.podControl = fakePodControl + return manager, fakePodControl +} + +func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + var trueVar = true + otherControllerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} + // add to podStore a matching Pod controlled by another controller. Expect no patch. + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{otherControllerReference} + manager.podStore.Indexer.Add(pod) + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // because the matching pod already has a controller, so 2 pods should be created. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 0) +} + +func TestPatchPodWithOtherOwnerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore one more matching pod that doesn't have a controller + // ref, but has an owner ref pointing to other object. Expect a patch to + // take control of it. + unrelatedOwnerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference} + manager.podStore.Indexer.Add(pod) + + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 1 patch to take control of pod, and 1 create of new pod. + validateSyncReplicaSet(t, fakePodControl, 1, 0, 1) +} + +func TestPatchPodWithCorrectOwnerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore a matching pod that has an ownerRef pointing to the rs, + // but ownerRef.Controller is false. Expect a patch to take control it. + rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name} + pod := newPod("pod", rs, api.PodRunning) + pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} + manager.podStore.Indexer.Add(pod) + + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 1 patch to take control of pod, and 1 create of new pod. + validateSyncReplicaSet(t, fakePodControl, 1, 0, 1) +} + +func TestPatchPodFails(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore two matching pods. Expect two patches to take control + // them. + manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning)) + // let both patches fail. The rs controller will assume it fails to take + // control of the pods and create new ones. + fakePodControl.Err = fmt.Errorf("Fake Error") + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) +} + +func TestPatchExtraPodsThenDelete(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // add to podStore three matching pods. Expect three patches to take control + // them, and later delete one of them. + manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning)) + manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning)) + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + // 3 patches to take control of the pods, and 1 deletion because there is an extra pod. + validateSyncReplicaSet(t, fakePodControl, 0, 1, 3) +} + +func TestUpdateLabelsRemoveControllerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + // put one pod in the podStore + pod := newPod("pod", rs, api.PodRunning) + var trueVar = true + rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} + pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} + updatedPod := *pod + // reset the labels + updatedPod.Labels = make(map[string]string) + // add the updatedPod to the store. This is consistent with the behavior of + // the Informer: Informer updates the store before call the handler + // (updatePod() in this case). + manager.podStore.Indexer.Add(&updatedPod) + // send a update of the same pod with modified labels + manager.updatePod(pod, &updatedPod) + // verifies that rs is added to the queue + rsKey := getKey(rs, t) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + manager.queue.Done(queueRS) + err := manager.syncReplicaSet(rsKey) + if err != nil { + t.Fatal(err) + } + // expect 1 patch to be sent to remove the controllerRef for the pod. + // expect 2 creates because the rs.Spec.Replicas=2 and there exists no + // matching pod. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 1) + fakePodControl.Clear() +} + +func TestUpdateSelectorControllerRef(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + // put 2 pods in the podStore + newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + // update the RS so that its selector no longer matches the pods + updatedRS := *rs + updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"} + // put the updatedRS into the store. This is consistent with the behavior of + // the Informer: Informer updates the store before call the handler + // (updateRS() in this case). + manager.rsStore.Store.Add(&updatedRS) + manager.updateRS(rs, &updatedRS) + // verifies that the rs is added to the queue + rsKey := getKey(rs, t) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + manager.queue.Done(queueRS) + err := manager.syncReplicaSet(rsKey) + if err != nil { + t.Fatal(err) + } + // expect 2 patches to be sent to remove the controllerRef for the pods. + // expect 2 creates because the rc.Spec.Replicas=2 and there exists no + // matching pod. + validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) + fakePodControl.Clear() +} + +// RS controller shouldn't adopt or create more pods if the rc is about to be +// deleted. +func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { + manager, fakePodControl := setupManagerWithGCEnabled() + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + now := unversioned.Now() + rs.DeletionTimestamp = &now + manager.rsStore.Store.Add(rs) + pod1 := newPod("pod1", rs, api.PodRunning) + manager.podStore.Indexer.Add(pod1) + + // no patch, no create + err := manager.syncReplicaSet(getKey(rs, t)) + if err != nil { + t.Fatal(err) + } + validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index ba5f5b8af5f..bedb2295d50 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -615,12 +615,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey) trace.Step("Expectations restored") - if err != nil { - glog.Errorf("Error getting pods for rc %q: %v", key, err) - rm.queue.Add(key) - return err - } - trace.Step("Pods listed") // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*api.Pod @@ -653,7 +647,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { - errlist = append(errlist, cm.ReleasePod(pod)) + errlist = append(errlist, err) } } if len(errlist) != 0 { diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go new file mode 100644 index 00000000000..733d56d7df0 --- /dev/null +++ b/test/integration/replicaset/replicaset_test.go @@ -0,0 +1,464 @@ +// +build integration,!no-etcd + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "fmt" + "net/http/httptest" + "reflect" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + internalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" + "k8s.io/kubernetes/pkg/client/restclient" + controllerframwork "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" + "k8s.io/kubernetes/pkg/controller/replicaset" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/integration/framework" +) + +func testLabels() map[string]string { + return map[string]string{"name": "test"} +} + +func newRS(name, namespace string, replicas int) *v1beta1.ReplicaSet { + replicasCopy := int32(replicas) + return &v1beta1.ReplicaSet{ + TypeMeta: unversioned.TypeMeta{ + Kind: "ReplicaSet", + APIVersion: "extensions/v1beta1", + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1beta1.ReplicaSetSpec{ + Selector: &v1beta1.LabelSelector{ + MatchLabels: testLabels(), + }, + Replicas: &replicasCopy, + Template: v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: testLabels(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + }, + }, + } +} + +func newMatchingPod(podName, namespace string) *v1.Pod { + return &v1.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: podName, + Namespace: namespace, + Labels: testLabels(), + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } +} + +// verifyRemainingObjects verifies if the number of the remaining replica +// sets and pods are rsNum and podNum. It returns error if the +// communication with the API server fails. +func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespace string, rsNum, podNum int) (bool, error) { + rsClient := clientSet.Extensions().ReplicaSets(namespace) + podClient := clientSet.Core().Pods(namespace) + pods, err := podClient.List(api.ListOptions{}) + if err != nil { + return false, fmt.Errorf("Failed to list pods: %v", err) + } + var ret = true + if len(pods.Items) != podNum { + ret = false + t.Logf("expect %d pods, got %d pods", podNum, len(pods.Items)) + } + rss, err := rsClient.List(api.ListOptions{}) + if err != nil { + return false, fmt.Errorf("Failed to list replica sets: %v", err) + } + if len(rss.Items) != rsNum { + ret = false + t.Logf("expect %d RSs, got %d RSs", rsNum, len(rss.Items)) + } + return ret, nil +} + +func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, controllerframwork.SharedIndexInformer, clientset.Interface) { + masterConfig := framework.NewIntegrationTestMasterConfig() + masterConfig.EnableCoreControllers = false + _, s := framework.RunAMaster(masterConfig) + + config := restclient.Config{Host: s.URL} + clientSet, err := clientset.NewForConfig(&config) + if err != nil { + t.Fatalf("Error in create clientset: %v", err) + } + resyncPeriod := 12 * time.Hour + resyncPeriodFunc := func() time.Duration { + return resyncPeriod + } + podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + rm := replicaset.NewReplicaSetController( + podInformer, + internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), + resyncPeriodFunc, + replicaset.BurstReplicas, + 4096, + enableGarbageCollector, + ) + + if err != nil { + t.Fatalf("Failed to create replicaset controller") + } + return s, rm, podInformer, clientSet +} + +func TestAdoption(t *testing.T) { + var trueVar = true + testCases := []struct { + name string + existingOwnerReferences func(rs *v1beta1.ReplicaSet) []v1.OwnerReference + expectedOwnerReferences func(rs *v1beta1.ReplicaSet) []v1.OwnerReference + }{ + { + "pod refers rs as an owner, not a controller", + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"}} + }, + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}} + }, + }, + { + "pod doesn't have owner references", + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{} + }, + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}} + }, + }, + { + "pod refers rs as a controller", + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}} + }, + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}} + }, + }, + { + "pod refers other rs as the controller, refers the rs as an owner", + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{ + {UID: "1", Name: "anotherRS", APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}, + {UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"}, + } + }, + func(rs *v1beta1.ReplicaSet) []v1.OwnerReference { + return []v1.OwnerReference{ + {UID: "1", Name: "anotherRS", APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}, + {UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"}, + } + }, + }, + } + for i, tc := range testCases { + s, rm, podInformer, clientSet := rmSetup(t, true) + ns := framework.CreateTestingNamespace(fmt.Sprintf("rs-adoption-%d", i), s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + rsClient := clientSet.Extensions().ReplicaSets(ns.Name) + podClient := clientSet.Core().Pods(ns.Name) + const rsName = "rs" + rs, err := rsClient.Create(newRS(rsName, ns.Name, 1)) + if err != nil { + t.Fatalf("Failed to create replica set: %v", err) + } + podName := fmt.Sprintf("pod%d", i) + pod := newMatchingPod(podName, ns.Name) + pod.OwnerReferences = tc.existingOwnerReferences(rs) + _, err = podClient.Create(pod) + if err != nil { + t.Fatalf("Failed to create Pod: %v", err) + } + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + // wait for the podInformer to observe the pod, otherwise the rs controller + // will try to create a new pod rather than adopting the existing one. + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + objects := podInformer.GetIndexer().List() + for _, object := range objects { + pod, ok := object.(*api.Pod) + if !ok { + t.Fatal("expect object to be a pod") + } + if pod.Name == podName { + return true, nil + } + } + return false, nil + }); err != nil { + t.Fatal(err) + } + go rm.Run(5, stopCh) + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + updatedPod, err := podClient.Get(pod.Name) + if err != nil { + return false, err + } + if e, a := tc.expectedOwnerReferences(rs), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) { + return true, nil + } else { + t.Logf("ownerReferences don't match, expect %v, got %v", e, a) + return false, nil + } + }); err != nil { + t.Fatal(err) + } + close(stopCh) + } +} + +func createRSsPods(t *testing.T, clientSet clientset.Interface, rss []*v1beta1.ReplicaSet, pods []*v1.Pod, ns string) { + rsClient := clientSet.Extensions().ReplicaSets(ns) + podClient := clientSet.Core().Pods(ns) + for _, rs := range rss { + if _, err := rsClient.Create(rs); err != nil { + t.Fatalf("Failed to create replica set %s: %v", rs.Name, err) + } + } + for _, pod := range pods { + if _, err := podClient.Create(pod); err != nil { + t.Fatalf("Failed to create pod %s: %v", pod.Name, err) + } + } +} + +func waitRSStable(t *testing.T, clientSet clientset.Interface, rs *v1beta1.ReplicaSet, ns string) { + rsClient := clientSet.Extensions().ReplicaSets(ns) + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + updatedRS, err := rsClient.Get(rs.Name) + if err != nil { + return false, err + } + if updatedRS.Status.Replicas != *rs.Spec.Replicas { + return false, nil + } else { + return true, nil + } + }); err != nil { + t.Fatal(err) + } +} + +func TestUpdateSelectorToAdopt(t *testing.T) { + // We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector + // matches pod1 only; change the selector to match pod2 as well. Verify + // there is only one pod left. + s, rm, podInformer, clientSet := rmSetup(t, true) + ns := framework.CreateTestingNamespace("rs-update-selector-to-adopt", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + rs := newRS("rs", ns.Name, 1) + // let rs's selector only match pod1 + rs.Spec.Selector.MatchLabels["uniqueKey"] = "1" + rs.Spec.Template.Labels["uniqueKey"] = "1" + pod1 := newMatchingPod("pod1", ns.Name) + pod1.Labels["uniqueKey"] = "1" + pod2 := newMatchingPod("pod2", ns.Name) + pod2.Labels["uniqueKey"] = "2" + createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + go rm.Run(5, stopCh) + waitRSStable(t, clientSet, rs, ns.Name) + + // change the rs's selector to match both pods + patch := `{"spec":{"selector":{"matchLabels": {"uniqueKey":null}}}}` + rsClient := clientSet.Extensions().ReplicaSets(ns.Name) + rs, err := rsClient.Patch(rs.Name, api.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch replica set: %v", err) + } + t.Logf("patched rs = %#v", rs) + // wait for the rs select both pods and delete one of them + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1) + }); err != nil { + t.Fatal(err) + } + close(stopCh) +} + +func TestUpdateSelectorToRemoveControllerRef(t *testing.T) { + // We have pod1, pod2 and rs. rs.spec.replicas=2. At first rs.Selector + // matches pod1 and pod2; change the selector to match only pod1. Verify + // that rs creates one more pod, so there are 3 pods. Also verify that + // pod2's controllerRef is cleared. + s, rm, podInformer, clientSet := rmSetup(t, true) + ns := framework.CreateTestingNamespace("rs-update-selector-to-remove-controllerref", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + rs := newRS("rs", ns.Name, 2) + pod1 := newMatchingPod("pod1", ns.Name) + pod1.Labels["uniqueKey"] = "1" + pod2 := newMatchingPod("pod2", ns.Name) + pod2.Labels["uniqueKey"] = "2" + createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + go rm.Run(5, stopCh) + waitRSStable(t, clientSet, rs, ns.Name) + + // change the rs's selector to match both pods + patch := `{"spec":{"selector":{"matchLabels": {"uniqueKey":"1"}},"template":{"metadata":{"labels":{"uniqueKey":"1"}}}}}` + rsClient := clientSet.Extensions().ReplicaSets(ns.Name) + rs, err := rsClient.Patch(rs.Name, api.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch replica set: %v", err) + } + t.Logf("patched rs = %#v", rs) + // wait for the rs to create one more pod + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3) + }); err != nil { + t.Fatal(err) + } + podClient := clientSet.Core().Pods(ns.Name) + pod2, err = podClient.Get(pod2.Name) + if err != nil { + t.Fatalf("Failed to get pod2: %v", err) + } + if len(pod2.OwnerReferences) != 0 { + t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences) + } + close(stopCh) +} + +func TestUpdateLabelToRemoveControllerRef(t *testing.T) { + // We have pod1, pod2 and rs. rs.spec.replicas=2. At first rs.Selector + // matches pod1 and pod2; change pod2's lables to non-matching. Verify + // that rs creates one more pod, so there are 3 pods. Also verify that + // pod2's controllerRef is cleared. + s, rm, podInformer, clientSet := rmSetup(t, true) + ns := framework.CreateTestingNamespace("rs-update-label-to-remove-controllerref", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + rs := newRS("rs", ns.Name, 2) + pod1 := newMatchingPod("pod1", ns.Name) + pod2 := newMatchingPod("pod2", ns.Name) + createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + go rm.Run(5, stopCh) + waitRSStable(t, clientSet, rs, ns.Name) + + // change the rs's selector to match both pods + patch := `{"metadata":{"labels":{"name":null}}}` + podClient := clientSet.Core().Pods(ns.Name) + pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch pod2: %v", err) + } + t.Logf("patched pod2 = %#v", pod2) + // wait for the rs to create one more pod + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3) + }); err != nil { + t.Fatal(err) + } + pod2, err = podClient.Get(pod2.Name) + if err != nil { + t.Fatalf("Failed to get pod2: %v", err) + } + if len(pod2.OwnerReferences) != 0 { + t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences) + } + close(stopCh) +} + +func TestUpdateLabelToBeAdopted(t *testing.T) { + // We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector + // matches pod1 only; change pod2's lables to be matching. Verify the RS + // controller adopts pod2 and delete one of them, so there is only 1 pod + // left. + s, rm, podInformer, clientSet := rmSetup(t, true) + ns := framework.CreateTestingNamespace("rs-update-label-to-be-adopted", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + rs := newRS("rs", ns.Name, 1) + // let rs's selector only matches pod1 + rs.Spec.Selector.MatchLabels["uniqueKey"] = "1" + rs.Spec.Template.Labels["uniqueKey"] = "1" + pod1 := newMatchingPod("pod1", ns.Name) + pod1.Labels["uniqueKey"] = "1" + pod2 := newMatchingPod("pod2", ns.Name) + pod2.Labels["uniqueKey"] = "2" + createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) + + stopCh := make(chan struct{}) + go podInformer.Run(stopCh) + go rm.Run(5, stopCh) + waitRSStable(t, clientSet, rs, ns.Name) + + // change the rs's selector to match both pods + patch := `{"metadata":{"labels":{"uniqueKey":"1"}}}` + podClient := clientSet.Core().Pods(ns.Name) + pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch pod2: %v", err) + } + t.Logf("patched pod2 = %#v", pod2) + // wait for the rs to select both pods and delete one of them + if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) { + return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1) + }); err != nil { + t.Fatal(err) + } + close(stopCh) +} diff --git a/test/integration/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go similarity index 99% rename from test/integration/replicationcontroller_test.go rename to test/integration/replicationcontroller/replicationcontroller_test.go index 45e8265665f..58a875e86e4 100644 --- a/test/integration/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integration +package replicationcontroller import ( "fmt"