From 119ff39f247f9447918c5d56548739f66c2db984 Mon Sep 17 00:00:00 2001 From: "Madhusudan.C.S" Date: Sun, 17 Jan 2016 14:26:25 -0800 Subject: [PATCH] Implement replica set controller. --- pkg/controller/replicaset/doc.go | 19 + pkg/controller/replicaset/replica_set.go | 463 +++++++++ pkg/controller/replicaset/replica_set_test.go | 908 ++++++++++++++++++ .../replicaset/replica_set_utils.go | 72 ++ .../replication/replication_controller.go | 2 + .../replication_controller_test.go | 2 + .../replication_controller_utils.go | 2 + 7 files changed, 1468 insertions(+) create mode 100644 pkg/controller/replicaset/doc.go create mode 100644 pkg/controller/replicaset/replica_set.go create mode 100644 pkg/controller/replicaset/replica_set_test.go create mode 100644 pkg/controller/replicaset/replica_set_utils.go diff --git a/pkg/controller/replicaset/doc.go b/pkg/controller/replicaset/doc.go new file mode 100644 index 00000000000..9d42796d990 --- /dev/null +++ b/pkg/controller/replicaset/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package replicaset contains logic for watching and synchronizing +// ReplicaSets. +package replicaset diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go new file mode 100644 index 00000000000..d286d7df384 --- /dev/null +++ b/pkg/controller/replicaset/replica_set.go @@ -0,0 +1,463 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// If you make changes to this file, you should also make the corresponding change in ReplicationController. + +package replicaset + +import ( + "reflect" + "sort" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + // We'll attempt to recompute the required replicas of all ReplicaSets + // that have fulfilled their expectations at least this often. This recomputation + // happens based on contents in local pod storage. + FullControllerResyncPeriod = 30 * time.Second + + // Realistic value of the burstReplica field for the replication manager based off + // performance requirements for kubernetes 1.0. + BurstReplicas = 500 + + // We must avoid counting pods until the pod store has synced. If it hasn't synced, to + // avoid a hot loop, we'll wait this long between checks. + PodStoreSyncedPollPeriod = 100 * time.Millisecond + + // The number of times we retry updating a ReplicaSet's status. + statusUpdateRetries = 1 +) + +// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored +// in the system with actual running pods. +type ReplicaSetController struct { + kubeClient client.Interface + podControl controller.PodControlInterface + + // A ReplicaSet is temporarily suspended after creating/deleting these many replicas. + // It resumes normal action after observing the watch events for them. + burstReplicas int + // To allow injection of syncReplicaSet for testing. + syncHandler func(rsKey string) error + + // A TTLCache of pod creates/deletes each ReplicaSet expects to see + expectations controller.ControllerExpectationsInterface + + // A store of ReplicaSets, populated by the rsController + rsStore cache.StoreToReplicaSetLister + // Watches changes to all ReplicaSets + rsController *framework.Controller + // A store of pods, populated by the podController + podStore cache.StoreToPodLister + // Watches changes to all pods + podController *framework.Controller + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool + + // Controllers that need to be synced + queue *workqueue.Type +} + +// NewReplicaSetController creates a new ReplicaSetController. +func NewReplicaSetController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicaSetController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + + rsc := &ReplicaSetController{ + kubeClient: kubeClient, + podControl: controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), + }, + burstReplicas: burstReplicas, + expectations: controller.NewControllerExpectations(), + queue: workqueue.New(), + } + + rsc.rsStore.Store, rsc.rsController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) + }, + }, + &extensions.ReplicaSet{}, + // TODO: Can we have much longer period here? + FullControllerResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: rsc.enqueueReplicaSet, + UpdateFunc: func(old, cur interface{}) { + // You might imagine that we only really need to enqueue the + // replica set when Spec changes, but it is safer to sync any + // time this function is triggered. That way a full informer + // resync can requeue any replica set that don't yet have pods + // but whose last attempts at creating a pod have failed (since + // we don't block on creation of pods) instead of those + // replica sets stalling indefinitely. Enqueueing every time + // does result in some spurious syncs (like when Status.Replica + // is updated and the watch notification from it retriggers + // this function), but in general extra resyncs shouldn't be + // that bad as ReplicaSets that haven't met expectations yet won't + // sync, and all the listing is done using local stores. + oldRS := old.(*extensions.ReplicaSet) + curRS := cur.(*extensions.ReplicaSet) + if oldRS.Status.Replicas != curRS.Status.Replicas { + glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas) + } + rsc.enqueueReplicaSet(cur) + }, + // This will enter the sync loop and no-op, because the replica set has been deleted from the store. + // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended + // way of achieving this is by performing a `stop` operation on the replica set. + DeleteFunc: rsc.enqueueReplicaSet, + }, + ) + + rsc.podStore.Store, rsc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return rsc.kubeClient.Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return rsc.kubeClient.Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, + resyncPeriod(), + framework.ResourceEventHandlerFuncs{ + AddFunc: rsc.addPod, + // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like + // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from + // local storage, so it should be ok. + UpdateFunc: rsc.updatePod, + DeleteFunc: rsc.deletePod, + }, + ) + + rsc.syncHandler = rsc.syncReplicaSet + rsc.podStoreSynced = rsc.podController.HasSynced + return rsc +} + +// SetEventRecorder replaces the event recorder used by the ReplicaSetController +// with the given recorder. Only used for testing. +func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) { + // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks + // need to pass in a fake. + rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder} +} + +// Run begins watching and syncing. +func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go rsc.rsController.Run(stopCh) + go rsc.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(rsc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down ReplicaSet Controller") + rsc.queue.ShutDown() +} + +// getPodReplicaSet returns the replica set managing the given pod. +// TODO: Surface that we are ignoring multiple replica sets for a single pod. +func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet { + rss, err := rsc.rsStore.GetPodReplicaSets(pod) + if err != nil { + glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) + return nil + } + // In theory, overlapping ReplicaSets is user error. This sorting will not prevent + // oscillation of replicas in all cases, eg: + // rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2 + // pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1. + // pod: [(k2:v2)] will wake rs2 which creates a new replica. + if len(rss) > 1 { + // More than two items in this list indicates user error. If two replicasets + // overlap, sort by creation timestamp, subsort by name, then pick + // the first. + glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels) + sort.Sort(overlappingReplicaSets(rss)) + } + return &rss[0] +} + +// When a pod is created, enqueue the replica set that manages it and update it's expectations. +func (rsc *ReplicaSetController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + rsc.deletePod(pod) + return + } + if rs := rsc.getPodReplicaSet(pod); rs != nil { + rsKey, err := controller.KeyFunc(rs) + if err != nil { + glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) + return + } + rsc.expectations.CreationObserved(rsKey) + rsc.enqueueReplicaSet(rs) + } +} + +// When a pod is updated, figure out what replica set/s manage it and wake them +// up. If the labels of the pod have changed we need to awaken both the old +// and new replica set. old and cur must be *api.Pod types. +func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known pods. + return + } + // TODO: Write a unittest for this case + curPod := cur.(*api.Pod) + if curPod.DeletionTimestamp != nil { + // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, + // and after such time has passed, the kubelet actually deletes it from the store. We receive an update + // for modification of the deletion timestamp and expect an ReplicaSet to create more replicas asap, not wait + // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because + // a ReplicaSet never initiates a phase change, and so is never asleep waiting for the same. + rsc.deletePod(curPod) + return + } + if rs := rsc.getPodReplicaSet(curPod); rs != nil { + rsc.enqueueReplicaSet(rs) + } + oldPod := old.(*api.Pod) + // Only need to get the old replica set if the labels changed. + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + // If the old and new ReplicaSet are the same, the first one that syncs + // will set expectations preventing any damage from the second. + if oldRS := rsc.getPodReplicaSet(oldPod); oldRS != nil { + rsc.enqueueReplicaSet(oldRS) + } + } +} + +// When a pod is deleted, enqueue the replica set that manages the pod and update its expectations. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (rsc *ReplicaSetController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new ReplicaSet will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a replica set recreates a replica", obj, controller.ExpectationsTimeout) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before replica set recreates a replica", obj, controller.ExpectationsTimeout) + return + } + } + if rs := rsc.getPodReplicaSet(pod); rs != nil { + rsKey, err := controller.KeyFunc(rs) + if err != nil { + glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) + return + } + rsc.expectations.DeletionObserved(rsKey) + rsc.enqueueReplicaSet(rs) + } +} + +// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item. +func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + // TODO: Handle overlapping replica sets better. Either disallow them at admission time or + // deterministically avoid syncing replica sets that fight over pods. Currently, we only + // ensure that the same replica set is synced for a given pod. When we periodically relist + // all replica sets there will still be some replica instability. One way to handle this is + // by querying the store for all replica sets that this replica set overlaps, as well as all + // replica sets that overlap this ReplicaSet, and sorting them. + rsc.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (rsc *ReplicaSetController) worker() { + for { + func() { + key, quit := rsc.queue.Get() + if quit { + return + } + defer rsc.queue.Done(key) + err := rsc.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing ReplicaSet: %v", err) + } + }() + } +} + +// manageReplicas checks and updates replicas for the given ReplicaSet. +func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *extensions.ReplicaSet) { + diff := len(filteredPods) - rs.Spec.Replicas + rsKey, err := controller.KeyFunc(rs) + if err != nil { + glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) + return + } + if diff < 0 { + diff *= -1 + if diff > rsc.burstReplicas { + diff = rsc.burstReplicas + } + rsc.expectations.ExpectCreations(rsKey, diff) + wait := sync.WaitGroup{} + wait.Add(diff) + glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) + for i := 0; i < diff; i++ { + go func() { + defer wait.Done() + if err := rsc.podControl.CreatePods(rs.Namespace, rs.Spec.Template, rs); err != nil { + // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) + rsc.expectations.CreationObserved(rsKey) + util.HandleError(err) + } + }() + } + wait.Wait() + } else if diff > 0 { + if diff > rsc.burstReplicas { + diff = rsc.burstReplicas + } + rsc.expectations.ExpectDeletions(rsKey, diff) + glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff) + // No need to sort pods if we are about to delete all of them + if rs.Spec.Replicas != 0 { + // Sort the pods in the order such that not-ready < ready, unscheduled + // < scheduled, and pending < running. This ensures that we delete pods + // in the earlier stages whenever possible. + sort.Sort(controller.ActivePods(filteredPods)) + } + + wait := sync.WaitGroup{} + wait.Add(diff) + for i := 0; i < diff; i++ { + go func(ix int) { + defer wait.Done() + if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil { + // Decrement the expected number of deletes because the informer won't observe this deletion + glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) + rsc.expectations.DeletionObserved(rsKey) + util.HandleError(err) + } + }(i) + } + wait.Wait() + } +} + +// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled, +// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be +// invoked concurrently with the same key. +func (rsc *ReplicaSetController) syncReplicaSet(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime)) + }() + + obj, exists, err := rsc.rsStore.Store.GetByKey(key) + if !exists { + glog.Infof("ReplicaSet has been deleted %v", key) + rsc.expectations.DeleteExpectations(key) + return nil + } + if err != nil { + glog.Infof("Unable to retrieve ReplicaSet %v from store: %v", key, err) + rsc.queue.Add(key) + return err + } + rs := *obj.(*extensions.ReplicaSet) + if !rsc.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(PodStoreSyncedPollPeriod) + glog.Infof("Waiting for pods controller to sync, requeuing ReplicaSet %v", rs.Name) + rsc.enqueueReplicaSet(&rs) + return nil + } + + // Check the expectations of the ReplicaSet before counting active pods, otherwise a new pod can sneak + // in and update the expectations after we've retrieved active pods from the store. If a new pod enters + // the store after we've checked the expectation, the ReplicaSet sync is just deferred till the next + // relist. + rsKey, err := controller.KeyFunc(&rs) + if err != nil { + glog.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err) + return err + } + rsNeedsSync := rsc.expectations.SatisfiedExpectations(rsKey) + selector, err := extensions.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + glog.Errorf("Error converting pod selector to selector: %v", err) + return err + } + podList, err := rsc.podStore.Pods(rs.Namespace).List(selector) + if err != nil { + glog.Errorf("Error getting pods for ReplicaSet %q: %v", key, err) + rsc.queue.Add(key) + return err + } + + // TODO: Do this in a single pass, or use an index. + filteredPods := controller.FilterActivePods(podList.Items) + if rsNeedsSync { + rsc.manageReplicas(filteredPods, &rs) + } + + // Always updates status as pods come up or die. + if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods)); err != nil { + // Multiple things could lead to this update failing. Requeuing the replica set ensures + // we retry with some fairness. + glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err) + rsc.enqueueReplicaSet(&rs) + } + return nil +} diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go new file mode 100644 index 00000000000..739b0985bf6 --- /dev/null +++ b/pkg/controller/replicaset/replica_set_test.go @@ -0,0 +1,908 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// If you make changes to this file, you should also make the corresponding change in ReplicationController. + +package replicaset + +import ( + "fmt" + "math/rand" + "net/http/httptest" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/sets" + utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/watch" +) + +var alwaysReady = func() bool { return true } + +func getKey(rs *extensions.ReplicaSet, t *testing.T) string { + if key, err := controller.KeyFunc(rs); err != nil { + t.Errorf("Unexpected error getting key for ReplicaSet %v: %v", rs.Name, err) + return "" + } else { + return key + } +} + +func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.ReplicaSet { + rs := &extensions.ReplicaSet{ + TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: replicas, + Selector: &extensions.LabelSelector{MatchLabels: selectorMap}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "name": "foo", + "type": "production", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSDefault, + NodeSelector: map[string]string{ + "baz": "blah", + }, + }, + }, + }, + } + return rs +} + +// create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store. +func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet) *api.PodList { + pods := []api.Pod{} + for i := 0; i < count; i++ { + newPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod%d", i), + Labels: labelMap, + Namespace: rs.Namespace, + }, + Status: api.PodStatus{Phase: status}, + } + if store != nil { + store.Add(&newPod) + } + pods = append(pods, newPod) + } + return &api.PodList{ + Items: pods, + } +} + +func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) { + if len(fakePodControl.Templates) != expectedCreates { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates)) + } + if len(fakePodControl.DeletePodName) != expectedDeletes { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) + } +} + +func replicaSetResourceName() string { + return "replicasets" +} + +type serverResponse struct { + statusCode int + obj interface{} +} + +func TestSyncReplicaSetDoesNothing(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + // 2 running pods, a controller with 2 replicas, sync is a no-op + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rsSpec) + newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec) + + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) +} + +func TestSyncReplicaSetDeletes(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + // 2 running pods and a controller with 1 replica, one pod delete expected + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rsSpec) + newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec) + + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 1) +} + +func TestDeleteFinalStateUnknown(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + received := make(chan string) + manager.syncHandler = func(key string) error { + received <- key + return nil + } + + // The DeletedFinalStateUnknown object should cause the ReplicaSet manager to insert + // the controller matching the selectors of the deleted pod into the work queue. + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rsSpec) + pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec) + manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) + + go manager.worker() + + expected := getKey(rsSpec, t) + select { + case key := <-received: + if key != expected { + t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected) + } + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") + } +} + +func TestSyncReplicaSetCreates(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + // A controller with 2 replicas and no pods in the store, 2 creates expected + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 2, 0) +} + +func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { + // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + // Steady state for the ReplicaSet, no Status.Replicas updates expected + activePods := 5 + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(activePods, labelMap) + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: activePods} + newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.syncReplicaSet(getKey(rs, t)) + + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + if fakeHandler.RequestReceived != nil { + t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state") + } + + // This response body is just so we don't err out decoding the http response, all + // we care about is the request body sent below. + response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) + fakeHandler.ResponseBody = response + + rs.Generation = rs.Generation + 1 + manager.syncReplicaSet(getKey(rs, t)) + + rs.Status.ObservedGeneration = rs.Generation + updatedRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) + fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &updatedRc) +} + +func TestControllerUpdateReplicas(t *testing.T) { + // This is a happy server just to record the PUT request we expect for status.Replicas + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + // Insufficient number of pods in the system, and Status.Replicas is wrong; + // Status.Replica should update to match number of pods in system, 1 new pod should be created. + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(5, labelMap) + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ObservedGeneration: 0} + rs.Generation = 1 + newPodList(manager.podStore.Store, 4, api.PodRunning, labelMap, rs) + + // This response body is just so we don't err out decoding the http response + response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) + fakeHandler.ResponseBody = response + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + manager.syncReplicaSet(getKey(rs, t)) + + // 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod. + // 2. Every update to the status should include the Generation of the spec. + rs.Status = extensions.ReplicaSetStatus{Replicas: 4, ObservedGeneration: 1} + + decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs) + fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) +} + +func TestSyncReplicaSetDormancy(t *testing.T) { + // Setup a test server so we can lie about the current state of pods + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rsSpec) + newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec) + + // Creates a replica and sets expectations + rsSpec.Status.Replicas = 1 + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + + // Expectations prevents replicas but not an update on status + rsSpec.Status.Replicas = 0 + fakePodControl.Clear() + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // Get the key for the controller + rsKey, err := controller.KeyFunc(rsSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rsSpec, err) + } + + // Lowering expectations should lead to a sync that creates a replica, however the + // fakePodControl error will prevent this, leaving expectations at 0, 0 + manager.expectations.CreationObserved(rsKey) + rsSpec.Status.Replicas = 1 + fakePodControl.Clear() + fakePodControl.Err = fmt.Errorf("Fake Error") + + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // This replica should not need a Lowering of expectations, since the previous create failed + fakePodControl.Err = nil + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + + // 1 PUT for the ReplicaSet status during dormancy window. + // Note that the pod creates go through pod control so they're not recorded. + fakeHandler.ValidateRequestCount(t, 1) +} + +func TestPodControllerLookup(t *testing.T) { + manager := NewReplicaSetController(client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + testCases := []struct { + inRSs []*extensions.ReplicaSet + pod *api.Pod + outRSName string + }{ + // pods without labels don't match any ReplicaSets + { + inRSs: []*extensions.ReplicaSet{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}}, + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}}, + outRSName: "", + }, + // Matching labels, not namespace + { + inRSs: []*extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: extensions.ReplicaSetSpec{ + Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRSName: "", + }, + // Matching ns and labels returns the key to the ReplicaSet, not the ReplicaSet name + { + inRSs: []*extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: extensions.ReplicaSetSpec{ + Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRSName: "bar", + }, + } + for _, c := range testCases { + for _, r := range c.inRSs { + manager.rsStore.Add(r) + } + if rs := manager.getPodReplicaSet(c.pod); rs != nil { + if c.outRSName != rs.Name { + t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName) + } + } else if c.outRSName != "" { + t.Errorf("Expected a replica set %v pod %v, found none", c.outRSName, c.pod.Name) + } + } +} + +type FakeWatcher struct { + w *watch.FakeWatcher + *testclient.Fake +} + +func TestWatchControllers(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{} + client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + var testRSSpec extensions.ReplicaSet + received := make(chan string) + + // The update sent through the fakeWatcher should make its way into the workqueue, + // and eventually into the syncHandler. The handler validates the received controller + // and closes the received channel to indicate that the test can finish. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + rsSpec := *obj.(*extensions.ReplicaSet) + if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) { + t.Errorf("Expected %#v, but got %#v", testRSSpec, rsSpec) + } + close(received) + return nil + } + // Start only the ReplicaSet watcher and the workqueue, send a watch event, + // and make sure it hits the sync method. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.rsController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + testRSSpec.Name = "foo" + fakeWatch.Add(&testRSSpec) + + select { + case <-received: + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestWatchPods(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{} + client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + // Put one ReplicaSet and one pod into the controller's stores + labelMap := map[string]string{"foo": "bar"} + testRSSpec := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(testRSSpec) + received := make(chan string) + // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and + // send it into the syncHandler. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + rsSpec := obj.(*extensions.ReplicaSet) + if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) { + t.Errorf("\nExpected %#v,\nbut got %#v", testRSSpec, rsSpec) + } + close(received) + return nil + } + // Start only the pod watcher and the workqueue, send a watch event, + // and make sure it hits the sync method for the right ReplicaSet. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.podController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec) + testPod := pods.Items[0] + testPod.Status.Phase = api.PodFailed + fakeWatch.Add(&testPod) + + select { + case <-received: + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestUpdatePods(t *testing.T) { + manager := NewReplicaSetController(testclient.NewSimpleFake(), controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + received := make(chan string) + + manager.syncHandler = func(key string) error { + obj, exists, err := manager.rsStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find replica set under key %v", key) + } + received <- obj.(*extensions.ReplicaSet).Name + return nil + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + // Put 2 ReplicaSets and one pod into the controller's stores + labelMap1 := map[string]string{"foo": "bar"} + testRSSpec1 := newReplicaSet(1, labelMap1) + manager.rsStore.Store.Add(testRSSpec1) + testRSSpec2 := *testRSSpec1 + labelMap2 := map[string]string{"bar": "foo"} + testRSSpec2.Spec.Selector = &extensions.LabelSelector{MatchLabels: labelMap2} + testRSSpec2.Name = "barfoo" + manager.rsStore.Store.Add(&testRSSpec2) + + // Put one pod in the podStore + pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1).Items[0] + pod2 := pod1 + pod2.Labels = labelMap2 + + // Send an update of the same pod with modified labels, and confirm we get a sync request for + // both controllers + manager.updatePod(&pod1, &pod2) + + expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets within 100ms each") + } + } +} + +func TestControllerUpdateRequeue(t *testing.T) { + // This server should force a requeue of the controller because it fails to update status.Replicas. + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas) + manager.podStoreSynced = alwaysReady + + labelMap := map[string]string{"foo": "bar"} + rs := newReplicaSet(1, labelMap) + manager.rsStore.Store.Add(rs) + rs.Status = extensions.ReplicaSetStatus{Replicas: 2} + newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + manager.syncReplicaSet(getKey(rs, t)) + + ch := make(chan interface{}) + go func() { + item, _ := manager.queue.Get() + ch <- item + }() + select { + case key := <-ch: + expectedKey := getKey(rs, t) + if key != expectedKey { + t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key) + } + case <-time.After(util.ForeverTestTimeout): + manager.queue.ShutDown() + t.Errorf("Expected to find a ReplicaSet in the queue, found none.") + } + // 1 Update and 1 GET, both of which fail + fakeHandler.ValidateRequestCount(t, 2) +} + +func TestControllerUpdateStatusWithFailure(t *testing.T) { + rs := newReplicaSet(1, map[string]string{"foo": "bar"}) + fakeClient := &testclient.FakeExperimental{Fake: &testclient.Fake{}} + fakeClient.AddReactor("get", "replicasets", func(action testclient.Action) (bool, runtime.Object, error) { return true, rs, nil }) + fakeClient.AddReactor("*", "*", func(action testclient.Action) (bool, runtime.Object, error) { + return true, &extensions.ReplicaSet{}, fmt.Errorf("Fake error") + }) + fakeRSClient := &testclient.FakeReplicaSets{fakeClient, "default"} + numReplicas := 10 + updateReplicaCount(fakeRSClient, *rs, numReplicas) + updates, gets := 0, 0 + for _, a := range fakeClient.Actions() { + if a.GetResource() != "replicasets" { + t.Errorf("Unexpected action %+v", a) + continue + } + + switch action := a.(type) { + case testclient.GetAction: + gets++ + // Make sure the get is for the right ReplicaSet even though the update failed. + if action.GetName() != rs.Name { + t.Errorf("Expected get for ReplicaSet %v, got %+v instead", rs.Name, action.GetName()) + } + case testclient.UpdateAction: + updates++ + // Confirm that the update has the right status.Replicas even though the Get + // returned a ReplicaSet with replicas=1. + if c, ok := action.GetObject().(*extensions.ReplicaSet); !ok { + t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c) + } else if c.Status.Replicas != numReplicas { + t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead", + numReplicas, c.Status.Replicas) + } + default: + t.Errorf("Unexpected action %+v", a) + break + } + } + if gets != 1 || updates != 2 { + t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) + } +} + +func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, burstReplicas) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(numReplicas, labelMap) + manager.rsStore.Store.Add(rsSpec) + + expectedPods := 0 + pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec) + + rsKey, err := controller.KeyFunc(rsSpec) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rsSpec, err) + } + + // Size up the controller, then size it down, and confirm the expected create/delete pattern + for _, replicas := range []int{numReplicas, 0} { + + rsSpec.Spec.Replicas = replicas + manager.rsStore.Store.Add(rsSpec) + + for i := 0; i < numReplicas; i += burstReplicas { + manager.syncReplicaSet(getKey(rsSpec, t)) + + // The store accrues active pods. It's also used by the ReplicaSet to determine how many + // replicas to create. + activePods := len(manager.podStore.Store.List()) + if replicas != 0 { + // This is the number of pods currently "in flight". They were created by the + // ReplicaSet controller above, which then puts the ReplicaSet to sleep till + // all of them have been observed. + expectedPods = replicas - activePods + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + // This validates the ReplicaSet manager sync actually created pods + validateSyncReplicaSet(t, &fakePodControl, expectedPods, 0) + + // This simulates the watch events for all but 1 of the expected pods. + // None of these should wake the controller because it has expectations==BurstReplicas. + for i := 0; i < expectedPods-1; i++ { + manager.podStore.Store.Add(&pods.Items[i]) + manager.addPod(&pods.Items[i]) + } + + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Fatalf("Did not find expectations for ReplicaSet.") + } + if add, _ := podExp.GetExpectations(); add != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } else { + expectedPods = (replicas - activePods) * -1 + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + validateSyncReplicaSet(t, &fakePodControl, 0, expectedPods) + for i := 0; i < expectedPods-1; i++ { + manager.podStore.Store.Delete(&pods.Items[i]) + manager.deletePod(&pods.Items[i]) + } + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Fatalf("Did not find expectations for ReplicaSet.") + } + if _, del := podExp.GetExpectations(); del != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } + + // Check that the ReplicaSet didn't take any action for all the above pods + fakePodControl.Clear() + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + + // Create/Delete the last pod + // The last add pod will decrease the expectation of the ReplicaSet to 0, + // which will cause it to create/delete the remaining replicas up to burstReplicas. + if replicas != 0 { + manager.podStore.Store.Add(&pods.Items[expectedPods-1]) + manager.addPod(&pods.Items[expectedPods-1]) + } else { + manager.podStore.Store.Delete(&pods.Items[expectedPods-1]) + manager.deletePod(&pods.Items[expectedPods-1]) + } + pods.Items = pods.Items[expectedPods:] + } + + // Confirm that we've created the right number of replicas + activePods := len(manager.podStore.Store.List()) + if activePods != rsSpec.Spec.Replicas { + t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods) + } + // Replenish the pod list, since we cut it down sizing up + pods = newPodList(nil, replicas, api.PodRunning, labelMap, rsSpec) + } +} + +func TestControllerBurstReplicas(t *testing.T) { + doTestControllerBurstReplicas(t, 5, 30) + doTestControllerBurstReplicas(t, 5, 12) + doTestControllerBurstReplicas(t, 3, 2) +} + +type FakeRSExpectations struct { + *controller.ControllerExpectations + satisfied bool + expSatisfied func() +} + +func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { + fe.expSatisfied() + return fe.satisfied +} + +// TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods +// and checking expectations. +func TestRSSyncExpectations(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2) + manager.podStoreSynced = alwaysReady + manager.podControl = &fakePodControl + + labelMap := map[string]string{"foo": "bar"} + rsSpec := newReplicaSet(2, labelMap) + manager.rsStore.Store.Add(rsSpec) + pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec) + manager.podStore.Store.Add(&pods.Items[0]) + postExpectationsPod := pods.Items[1] + + manager.expectations = FakeRSExpectations{ + controller.NewControllerExpectations(), true, func() { + // If we check active pods before checking expectataions, the + // ReplicaSet will create a new replica because it doesn't see + // this pod, but has fulfilled its expectations. + manager.podStore.Store.Add(&postExpectationsPod) + }, + } + manager.syncReplicaSet(getKey(rsSpec, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) +} + +func TestDeleteControllerAndExpectations(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10) + manager.podStoreSynced = alwaysReady + + rs := newReplicaSet(1, map[string]string{"foo": "bar"}) + manager.rsStore.Store.Add(rs) + + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + // This should set expectations for the ReplicaSet + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) + fakePodControl.Clear() + + // Get the ReplicaSet key + rsKey, err := controller.KeyFunc(rs) + if err != nil { + t.Errorf("Couldn't get key for object %+v: %v", rs, err) + } + + // This is to simulate a concurrent addPod, that has a handle on the expectations + // as the controller deletes it. + podExp, exists, err := manager.expectations.GetExpectations(rsKey) + if !exists || err != nil { + t.Errorf("No expectations found for ReplicaSet") + } + manager.rsStore.Delete(rs) + manager.syncReplicaSet(getKey(rs, t)) + + if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { + t.Errorf("Found expectaions, expected none since the ReplicaSet has been deleted.") + } + + // This should have no effect, since we've deleted the ReplicaSet. + podExp.Seen(1, 0) + manager.podStore.Store.Replace(make([]interface{}, 0), "0") + manager.syncReplicaSet(getKey(rs, t)) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) +} + +func TestRSManagerNotReady(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + fakePodControl := controller.FakePodControl{} + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2) + manager.podControl = &fakePodControl + manager.podStoreSynced = func() bool { return false } + + // Simulates the ReplicaSet reflector running before the pod reflector. We don't + // want to end up creating replicas in this case until the pod reflector + // has synced, so the ReplicaSet controller should just requeue the ReplicaSet. + rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"}) + manager.rsStore.Store.Add(rsSpec) + + rsKey := getKey(rsSpec, t) + manager.syncReplicaSet(rsKey) + validateSyncReplicaSet(t, &fakePodControl, 0, 0) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + + manager.podStoreSynced = alwaysReady + manager.syncReplicaSet(rsKey) + validateSyncReplicaSet(t, &fakePodControl, 1, 0) +} + +// shuffle returns a new shuffled list of container controllers. +func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet { + numControllers := len(controllers) + randIndexes := rand.Perm(numControllers) + shuffled := make([]*extensions.ReplicaSet, numControllers) + for i := 0; i < numControllers; i++ { + shuffled[i] = controllers[randIndexes[i]] + } + return shuffled +} + +func TestOverlappingRSs(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + labelMap := map[string]string{"foo": "bar"} + + for i := 0; i < 5; i++ { + manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10) + manager.podStoreSynced = alwaysReady + + // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store + var controllers []*extensions.ReplicaSet + for j := 1; j < 10; j++ { + rsSpec := newReplicaSet(1, labelMap) + rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) + rsSpec.Name = string(util.NewUUID()) + controllers = append(controllers, rsSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + manager.rsStore.Store.Add(shuffledControllers[j]) + } + // Add a pod and make sure only the oldest ReplicaSet is synced + pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0]) + rsKey := getKey(controllers[0], t) + + manager.addPod(&pods.Items[0]) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + } +} diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go new file mode 100644 index 00000000000..c1dcd87c576 --- /dev/null +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -0,0 +1,72 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// If you make changes to this file, you should also make the corresponding change in ReplicationController. + +package replicaset + +import ( + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/apis/extensions" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +// updateReplicaCount attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry. +func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas int) (updateErr error) { + // This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since + // we do a periodic relist every 30s. If the generations differ but the replicas are + // the same, a caller might've resized to the same replica count. + if rs.Status.Replicas == numReplicas && + rs.Generation == rs.Status.ObservedGeneration { + return nil + } + // Save the generation number we acted on, otherwise we might wrongfully indicate + // that we've seen a spec update when we retry. + // TODO: This can clobber an update if we allow multiple agents to write to the + // same status. + generation := rs.Generation + + var getErr error + for i, rs := 0, &rs; ; i++ { + glog.V(4).Infof("Updating replica count for ReplicaSet: %v, %d->%d (need %d), sequence No: %v->%v", + rs.Name, rs.Status.Replicas, numReplicas, rs.Spec.Replicas, rs.Status.ObservedGeneration, generation) + + rs.Status = extensions.ReplicaSetStatus{Replicas: numReplicas, ObservedGeneration: generation} + _, updateErr = rsClient.UpdateStatus(rs) + if updateErr == nil || i >= statusUpdateRetries { + return updateErr + } + // Update the ReplicaSet with the latest resource version for the next poll + if rs, getErr = rsClient.Get(rs.Name); getErr != nil { + // If the GET fails we can't trust status.Replicas anymore. This error + // is bound to be more interesting than the update failure. + return getErr + } + } +} + +// overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker. +type overlappingReplicaSets []extensions.ReplicaSet + +func (o overlappingReplicaSets) Len() int { return len(o) } +func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o overlappingReplicaSets) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 24f50bde317..b363161fcb7 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// If you make changes to this file, you should also make the corresponding change in ReplicaSet. + package replication import ( diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index dc67e883f16..8ebb443c01c 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// If you make changes to this file, you should also make the corresponding change in ReplicaSet. + package replication import ( diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index b300e092205..6334b317ee7 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -14,6 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ +// If you make changes to this file, you should also make the corresponding change in ReplicaSet. + package replication import (