diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index 7375a9e1ad7..3711a751aa5 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -24,6 +24,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 4161e136659..e761ce0b057 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -14,7 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -// If you make changes to this file, you should also make the corresponding change in ReplicationController. +// ### ATTENTION ### +// +// This code implements both ReplicaSet and ReplicationController. +// +// For RC, the objects are converted on the way in and out (see ../replication/), +// as if ReplicationController were just an older API version of ReplicaSet. +// However, RC and RS still have separate storage and separate instantiations +// of the ReplicaSetController object. +// +// Use rsc.Kind in log messages rather than hard-coding "ReplicaSet". package replicaset @@ -22,6 +31,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync" "time" @@ -32,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" @@ -59,12 +70,14 @@ const ( statusUpdateRetries = 1 ) -// controllerKind contains the schema.GroupVersionKind for this controller type. -var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") - // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. type ReplicaSetController struct { + // GroupVersionKind indicates the controller type. + // Different instances of this struct may handle different GVKs. + // For example, this struct can be used (with adapters) to handle ReplicationController. + schema.GroupVersionKind + kubeClient clientset.Interface podControl controller.PodControlInterface @@ -95,22 +108,35 @@ type ReplicaSetController struct { // NewReplicaSetController configures a replica set controller with the specified event recorder func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController { - if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) - } eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) - - rsc := &ReplicaSetController{ - kubeClient: kubeClient, - podControl: controller.RealPodControl{ + return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas, + v1beta1.SchemeGroupVersion.WithKind("ReplicaSet"), + "replicaset_controller", + "replicaset", + controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}), }, - burstReplicas: burstReplicas, - expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"), + ) +} + +// NewBaseController is the implementation of NewReplicaSetController with additional injected +// parameters so that it can also serve as the implementation of NewReplicationController. +func NewBaseController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, + gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) + } + + rsc := &ReplicaSetController{ + GroupVersionKind: gvk, + kubeClient: kubeClient, + podControl: podControl, + burstReplicas: burstReplicas, + expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -153,10 +179,11 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() - glog.Infof("Starting replica set controller") - defer glog.Infof("Shutting down replica set Controller") + controllerName := strings.ToLower(rsc.Kind) + glog.Infof("Starting %v controller", controllerName) + defer glog.Infof("Shutting down %v controller", controllerName) - if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) { + if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } @@ -176,7 +203,7 @@ func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.Re if len(rss) > 1 { // ControllerRef will ensure we don't do anything crazy, but more than one // item in this list nevertheless constitutes user error. - utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)) + utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels)) } return rss } @@ -187,7 +214,7 @@ func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.Re func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *extensions.ReplicaSet { // We can't look up by UID, so look up by Name and then verify UID. // Don't even try to look up by Name if it's the wrong Kind. - if controllerRef.Kind != controllerKind.Kind { + if controllerRef.Kind != rsc.Kind { return nil } rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name) @@ -220,7 +247,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { // that bad as ReplicaSets that haven't met expectations yet won't // sync, and all the listing is done using local stores. if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) { - glog.V(4).Infof("Replica set %v updated. Desired pod count change: %d->%d", curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) + glog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) } rsc.enqueueReplicaSet(cur) } @@ -319,7 +346,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // Note that this still suffers from #29229, we are just moving the problem one level // "closer" to kubelet (from the deployment to the replica set controller). if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds) + glog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) @@ -434,7 +461,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } if diff < 0 { @@ -448,7 +475,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) - glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) + glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would @@ -460,8 +487,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, + APIVersion: rsc.GroupVersion().String(), + Kind: rsc.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), @@ -485,7 +512,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte // The skipped pods will be retried later. The next controller resync will // retry the slow start process. if skippedPods := diff - successfulCreations; skippedPods > 0 { - glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %v/%v", skippedPods, rs.Namespace, rs.Name) + glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name) for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod rsc.expectations.CreationObserved(rsKey) @@ -496,7 +523,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte if diff > rsc.burstReplicas { diff = rsc.burstReplicas } - glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) + glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) // Choose which Pods to delete, preferring those in earlier phases of startup. podsToDelete := getPodsToDelete(filteredPods, diff) @@ -518,7 +545,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(targetPod) - glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name) + glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name) rsc.expectations.DeletionObserved(rsKey, podKey) errCh <- err } @@ -543,9 +570,10 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (rsc *ReplicaSetController) syncReplicaSet(key string) error { + startTime := time.Now() defer func() { - glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime)) + glog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -554,7 +582,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { } rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { - glog.V(4).Infof("ReplicaSet has been deleted %v", key) + glog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } @@ -623,11 +651,11 @@ func (rsc *ReplicaSetController) claimPods(rs *extensions.ReplicaSet, selector l return nil, err } if fresh.UID != rs.UID { - return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID) + return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID) } return fresh, nil }) - cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc) + cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc) return cm.ClaimPods(filteredPods) } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 57418b58ddb..da71ea727b6 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// If you make changes to this file, you should also make the corresponding change in ReplicationController. - package replicaset import ( diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index ec11fd106fa..ad628da9551 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -55,7 +55,7 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs *ext var getErr, updateErr error var updatedRS *extensions.ReplicaSet for i, rs := 0, rs; ; i++ { - glog.V(4).Infof(fmt.Sprintf("Updating status for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) + + glog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) + fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) + fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) + fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) + diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index d6dc8d1e2ea..c07d94ec494 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -9,63 +9,45 @@ load( go_library( name = "go_default_library", srcs = [ + "conversion.go", "doc.go", "replication_controller.go", "replication_controller_utils.go", ], importpath = "k8s.io/kubernetes/pkg/controller/replication", deps = [ - "//pkg/api/v1/pod:go_default_library", + "//pkg/api/v1:go_default_library", + "//pkg/apis/extensions:go_default_library", + "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/controller:go_default_library", - "//pkg/util/metrics:go_default_library", + "//pkg/controller/replicaset:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/extensions/v1beta1:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/integer:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["replication_controller_test.go"], + srcs = ["replication_controller_utils_test.go"], importpath = "k8s.io/kubernetes/pkg/controller/replication", library = ":go_default_library", - deps = [ - "//pkg/api/legacyscheme:go_default_library", - "//pkg/api/testapi:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/securitycontext:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", - "//vendor/k8s.io/client-go/informers:go_default_library", - "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/rest:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/util/testing:go_default_library", - "//vendor/k8s.io/client-go/util/workqueue:go_default_library", - ], + deps = ["//vendor/k8s.io/api/core/v1:go_default_library"], ) filegroup( diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go new file mode 100644 index 00000000000..c655d6668f2 --- /dev/null +++ b/pkg/controller/replication/conversion.go @@ -0,0 +1,332 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file contains adapters that convert between RC and RS, +// as if ReplicationController were an older API version of ReplicaSet. +// It allows ReplicaSetController to directly replace the old ReplicationManager, +// which was previously a manually-maintained copy-paste of RSC. + +package replication + +import ( + "errors" + "fmt" + "time" + + "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + v1client "k8s.io/client-go/kubernetes/typed/core/v1" + extensionsv1beta1client "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" + v1listers "k8s.io/client-go/listers/core/v1" + extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" + "k8s.io/client-go/tools/cache" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/extensions" + extensionsinternalv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/controller" +) + +// informerAdapter implements ReplicaSetInformer by wrapping ReplicationControllerInformer +// and converting objects. +type informerAdapter struct { + rcInformer coreinformers.ReplicationControllerInformer +} + +func (i informerAdapter) Informer() cache.SharedIndexInformer { + return conversionInformer{i.rcInformer.Informer()} +} + +func (i informerAdapter) Lister() extensionslisters.ReplicaSetLister { + return conversionLister{i.rcInformer.Lister()} +} + +type conversionInformer struct { + cache.SharedIndexInformer +} + +func (i conversionInformer) AddEventHandler(handler cache.ResourceEventHandler) { + i.SharedIndexInformer.AddEventHandler(conversionEventHandler{handler}) +} + +func (i conversionInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + i.SharedIndexInformer.AddEventHandlerWithResyncPeriod(conversionEventHandler{handler}, resyncPeriod) +} + +type conversionLister struct { + rcLister v1listers.ReplicationControllerLister +} + +func (l conversionLister) List(selector labels.Selector) ([]*extensionsv1beta1.ReplicaSet, error) { + rcList, err := l.rcLister.List(selector) + if err != nil { + return nil, err + } + return convertSlice(rcList) +} + +func (l conversionLister) ReplicaSets(namespace string) extensionslisters.ReplicaSetNamespaceLister { + return conversionNamespaceLister{l.rcLister.ReplicationControllers(namespace)} +} + +func (l conversionLister) GetPodReplicaSets(pod *v1.Pod) ([]*extensionsv1beta1.ReplicaSet, error) { + rcList, err := l.rcLister.GetPodControllers(pod) + if err != nil { + return nil, err + } + return convertSlice(rcList) +} + +type conversionNamespaceLister struct { + rcLister v1listers.ReplicationControllerNamespaceLister +} + +func (l conversionNamespaceLister) List(selector labels.Selector) ([]*extensionsv1beta1.ReplicaSet, error) { + rcList, err := l.rcLister.List(selector) + if err != nil { + return nil, err + } + return convertSlice(rcList) +} + +func (l conversionNamespaceLister) Get(name string) (*extensionsv1beta1.ReplicaSet, error) { + rc, err := l.rcLister.Get(name) + if err != nil { + return nil, err + } + return convertRCtoRS(rc, nil) +} + +type conversionEventHandler struct { + handler cache.ResourceEventHandler +} + +func (h conversionEventHandler) OnAdd(obj interface{}) { + rs, err := convertRCtoRS(obj.(*v1.ReplicationController), nil) + if err != nil { + utilruntime.HandleError(fmt.Errorf("dropping RC OnAdd event: can't convert object %#v to RS: %v", obj, err)) + return + } + h.handler.OnAdd(rs) +} + +func (h conversionEventHandler) OnUpdate(oldObj, newObj interface{}) { + oldRS, err := convertRCtoRS(oldObj.(*v1.ReplicationController), nil) + if err != nil { + utilruntime.HandleError(fmt.Errorf("dropping RC OnUpdate event: can't convert old object %#v to RS: %v", oldObj, err)) + return + } + newRS, err := convertRCtoRS(newObj.(*v1.ReplicationController), nil) + if err != nil { + utilruntime.HandleError(fmt.Errorf("dropping RC OnUpdate event: can't convert new object %#v to RS: %v", newObj, err)) + return + } + h.handler.OnUpdate(oldRS, newRS) +} + +func (h conversionEventHandler) OnDelete(obj interface{}) { + rc, ok := obj.(*v1.ReplicationController) + if !ok { + // Convert the Obj inside DeletedFinalStateUnknown. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: couldn't get object from tombstone %+v", obj)) + return + } + rc, ok = tombstone.Obj.(*v1.ReplicationController) + if !ok { + utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: tombstone contained object that is not a RC %#v", obj)) + return + } + rs, err := convertRCtoRS(rc, nil) + if err != nil { + utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: can't convert object %#v to RS: %v", obj, err)) + return + } + h.handler.OnDelete(cache.DeletedFinalStateUnknown{Key: tombstone.Key, Obj: rs}) + return + } + + // It's a regular RC object. + rs, err := convertRCtoRS(rc, nil) + if err != nil { + utilruntime.HandleError(fmt.Errorf("dropping RC OnDelete event: can't convert object %#v to RS: %v", obj, err)) + return + } + h.handler.OnDelete(rs) +} + +type clientsetAdapter struct { + clientset.Interface +} + +func (c clientsetAdapter) ExtensionsV1beta1() extensionsv1beta1client.ExtensionsV1beta1Interface { + return conversionExtensionsClient{c.Interface, c.Interface.ExtensionsV1beta1()} +} + +func (c clientsetAdapter) Extensions() extensionsv1beta1client.ExtensionsV1beta1Interface { + return conversionExtensionsClient{c.Interface, c.Interface.Extensions()} +} + +type conversionExtensionsClient struct { + clientset clientset.Interface + extensionsv1beta1client.ExtensionsV1beta1Interface +} + +func (c conversionExtensionsClient) ReplicaSets(namespace string) extensionsv1beta1client.ReplicaSetInterface { + return conversionClient{c.clientset.CoreV1().ReplicationControllers(namespace)} +} + +type conversionClient struct { + v1client.ReplicationControllerInterface +} + +func (c conversionClient) Create(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) { + return convertCall(c.ReplicationControllerInterface.Create, rs) +} + +func (c conversionClient) Update(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) { + return convertCall(c.ReplicationControllerInterface.Update, rs) +} + +func (c conversionClient) UpdateStatus(rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) { + return convertCall(c.ReplicationControllerInterface.UpdateStatus, rs) +} + +func (c conversionClient) Get(name string, options metav1.GetOptions) (*extensionsv1beta1.ReplicaSet, error) { + rc, err := c.ReplicationControllerInterface.Get(name, options) + if err != nil { + return nil, err + } + return convertRCtoRS(rc, nil) +} + +func (c conversionClient) List(opts metav1.ListOptions) (*extensionsv1beta1.ReplicaSetList, error) { + rcList, err := c.ReplicationControllerInterface.List(opts) + if err != nil { + return nil, err + } + return convertList(rcList) +} + +func (c conversionClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { + // This is not used by RSC because we wrap the shared informer instead. + return nil, errors.New("Watch() is not implemented for conversionClient") +} + +func (c conversionClient) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *extensionsv1beta1.ReplicaSet, err error) { + // This is not used by RSC. + return nil, errors.New("Patch() is not implemented for conversionClient") +} + +func convertSlice(rcList []*v1.ReplicationController) ([]*extensionsv1beta1.ReplicaSet, error) { + rsList := make([]*extensionsv1beta1.ReplicaSet, 0, len(rcList)) + for _, rc := range rcList { + rs, err := convertRCtoRS(rc, nil) + if err != nil { + return nil, err + } + rsList = append(rsList, rs) + } + return rsList, nil +} + +func convertList(rcList *v1.ReplicationControllerList) (*extensionsv1beta1.ReplicaSetList, error) { + rsList := &extensionsv1beta1.ReplicaSetList{Items: make([]extensionsv1beta1.ReplicaSet, len(rcList.Items))} + for i := range rcList.Items { + rc := &rcList.Items[i] + _, err := convertRCtoRS(rc, &rsList.Items[i]) + if err != nil { + return nil, err + } + } + return rsList, nil +} + +func convertCall(fn func(*v1.ReplicationController) (*v1.ReplicationController, error), rs *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) { + rc, err := convertRStoRC(rs) + if err != nil { + return nil, err + } + result, err := fn(rc) + if err != nil { + return nil, err + } + return convertRCtoRS(result, nil) +} + +func convertRCtoRS(rc *v1.ReplicationController, out *extensionsv1beta1.ReplicaSet) (*extensionsv1beta1.ReplicaSet, error) { + var rsInternal extensions.ReplicaSet + if err := apiv1.Convert_v1_ReplicationController_to_extensions_ReplicaSet(rc, &rsInternal, nil); err != nil { + return nil, fmt.Errorf("can't convert ReplicationController %v/%v to ReplicaSet: %v", rc.Namespace, rc.Name, err) + } + if out == nil { + out = new(extensionsv1beta1.ReplicaSet) + } + if err := extensionsinternalv1beta1.Convert_extensions_ReplicaSet_To_v1beta1_ReplicaSet(&rsInternal, out, nil); err != nil { + return nil, fmt.Errorf("can't convert ReplicaSet (converted from ReplicationController %v/%v) from internal to extensions/v1beta1: %v", rc.Namespace, rc.Name, err) + } + return out, nil +} + +func convertRStoRC(rs *extensionsv1beta1.ReplicaSet) (*v1.ReplicationController, error) { + var rsInternal extensions.ReplicaSet + if err := extensionsinternalv1beta1.Convert_v1beta1_ReplicaSet_To_extensions_ReplicaSet(rs, &rsInternal, nil); err != nil { + return nil, fmt.Errorf("can't convert ReplicaSet (converting to ReplicationController %v/%v) from extensions/v1beta1 to internal: %v", rs.Namespace, rs.Name, err) + } + var rc v1.ReplicationController + if err := apiv1.Convert_extensions_ReplicaSet_to_v1_ReplicationController(&rsInternal, &rc, nil); err != nil { + return nil, fmt.Errorf("can't convert ReplicaSet to ReplicationController %v/%v: %v", rs.Namespace, rs.Name, err) + } + return &rc, nil +} + +type podControlAdapter struct { + controller.PodControlInterface +} + +func (pc podControlAdapter) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { + // This is not used by RSC. + return errors.New("CreatePods() is not implemented for podControlAdapter") +} + +func (pc podControlAdapter) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + // This is not used by RSC. + return errors.New("CreatePodsOnNode() is not implemented for podControlAdapter") +} + +func (pc podControlAdapter) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + rc, err := convertRStoRC(object.(*extensionsv1beta1.ReplicaSet)) + if err != nil { + return err + } + return pc.PodControlInterface.CreatePodsWithControllerRef(namespace, template, rc, controllerRef) +} + +func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error { + rc, err := convertRStoRC(object.(*extensionsv1beta1.ReplicaSet)) + if err != nil { + return err + } + return pc.PodControlInterface.DeletePod(namespace, podID, rc) +} diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 4f7be18143f..b181db0f335 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -14,653 +14,54 @@ See the License for the specific language governing permissions and limitations under the License. */ -// If you make changes to this file, you should also make the corresponding change in ReplicaSet. +// ### ATTENTION ### +// +// ReplicationManager is now just a wrapper around ReplicaSetController, +// with a conversion layer that effectively treats ReplicationController +// as if it were an older API version of ReplicaSet. +// +// However, RC and RS still have separate storage and separate instantiations +// of the ReplicaSetController object. package replication import ( - "fmt" - "reflect" - "sort" - "sync" - "time" - "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - utiltrace "k8s.io/apiserver/pkg/util/trace" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/integer" - "k8s.io/client-go/util/workqueue" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util/metrics" + "k8s.io/kubernetes/pkg/controller/replicaset" ) const ( - // Realistic value of the burstReplica field for the replication manager based off - // performance requirements for kubernetes 1.0. - BurstReplicas = 500 - - // The number of times we retry updating a replication controller's status. - statusUpdateRetries = 1 + BurstReplicas = replicaset.BurstReplicas ) -// controllerKind contains the schema.GroupVersionKind for this controller type. -var controllerKind = v1.SchemeGroupVersion.WithKind("ReplicationController") - // ReplicationManager is responsible for synchronizing ReplicationController objects stored // in the system with actual running pods. -// NOTE: using this name to distinguish this type from API object "ReplicationController"; will -// not fix it right now. Refer to #41459 for more detail. +// It is actually just a wrapper around ReplicaSetController. type ReplicationManager struct { - kubeClient clientset.Interface - podControl controller.PodControlInterface - - // An rc is temporarily suspended after creating/deleting these many replicas. - // It resumes normal action after observing the watch events for them. - burstReplicas int - // To allow injection of syncReplicationController for testing. - syncHandler func(rcKey string) error - - // A TTLCache of pod creates/deletes each rc expects to see. - expectations *controller.UIDTrackingControllerExpectations - - rcLister corelisters.ReplicationControllerLister - rcListerSynced cache.InformerSynced - - podLister corelisters.PodLister - // podListerSynced returns true if the pod store has been synced at least once. - // Added as a member to the struct to allow injection for testing. - podListerSynced cache.InformerSynced - - // Controllers that need to be synced - queue workqueue.RateLimitingInterface + replicaset.ReplicaSetController } // NewReplicationManager configures a replication manager with the specified event recorder func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { - if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) - } - eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) - - rm := &ReplicationManager{ - kubeClient: kubeClient, - podControl: controller.RealPodControl{ - KubeClient: kubeClient, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}), - }, - burstReplicas: burstReplicas, - expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"), - } - - rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rm.enqueueController, - UpdateFunc: rm.updateRC, - // This will enter the sync loop and no-op, because the controller has been deleted from the store. - // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the controller. - DeleteFunc: rm.enqueueController, - }) - rm.rcLister = rcInformer.Lister() - rm.rcListerSynced = rcInformer.Informer().HasSynced - - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rm.addPod, - // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill - // the most frequent pod update is status, and the associated rc will only list from local storage, so - // it should be ok. - UpdateFunc: rm.updatePod, - DeleteFunc: rm.deletePod, - }) - rm.podLister = podInformer.Lister() - rm.podListerSynced = podInformer.Informer().HasSynced - - rm.syncHandler = rm.syncReplicationController - return rm -} - -// SetEventRecorder replaces the event recorder used by the replication manager -// with the given recorder. Only used for testing. -func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { - // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks - // need to pass in a fake. - rm.podControl = controller.RealPodControl{KubeClient: rm.kubeClient, Recorder: recorder} -} - -// Run begins watching and syncing. -func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - defer rm.queue.ShutDown() - - glog.Infof("Starting RC controller") - defer glog.Infof("Shutting down RC controller") - - if !controller.WaitForCacheSync("RC", stopCh, rm.podListerSynced, rm.rcListerSynced) { - return - } - - for i := 0; i < workers; i++ { - go wait.Until(rm.worker, time.Second, stopCh) - } - - <-stopCh -} - -// getPodControllers returns a list of ReplicationControllers matching the given pod. -func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController { - rcs, err := rm.rcLister.GetPodControllers(pod) - if err != nil { - return nil - } - if len(rcs) > 1 { - // ControllerRef will ensure we don't do anything crazy, but more than one - // item in this list nevertheless constitutes user error. - utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicationController is selecting pods with labels: %+v", pod.Labels)) - } - return rcs -} - -// resolveControllerRef returns the controller referenced by a ControllerRef, -// or nil if the ControllerRef could not be resolved to a matching controller -// of the correct Kind. -func (rm *ReplicationManager) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *v1.ReplicationController { - // We can't look up by UID, so look up by Name and then verify UID. - // Don't even try to look up by Name if it's the wrong Kind. - if controllerRef.Kind != controllerKind.Kind { - return nil - } - rc, err := rm.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name) - if err != nil { - return nil - } - if rc.UID != controllerRef.UID { - // The controller we found with this Name is not the same one that the - // ControllerRef points to. - return nil - } - return rc -} - -// callback when RC is updated -func (rm *ReplicationManager) updateRC(old, cur interface{}) { - oldRC := old.(*v1.ReplicationController) - curRC := cur.(*v1.ReplicationController) - - // You might imagine that we only really need to enqueue the - // controller when Spec changes, but it is safer to sync any - // time this function is triggered. That way a full informer - // resync can requeue any controllers that don't yet have pods - // but whose last attempts at creating a pod have failed (since - // we don't block on creation of pods) instead of those - // controllers stalling indefinitely. Enqueueing every time - // does result in some spurious syncs (like when Status.Replica - // is updated and the watch notification from it retriggers - // this function), but in general extra resyncs shouldn't be - // that bad as rcs that haven't met expectations yet won't - // sync, and all the listing is done using local stores. - if *(oldRC.Spec.Replicas) != *(curRC.Spec.Replicas) { - glog.V(4).Infof("Replication controller %v updated. Desired pod count change: %d->%d", curRC.Name, *(oldRC.Spec.Replicas), *(curRC.Spec.Replicas)) - } - rm.enqueueController(cur) -} - -// When a pod is created, enqueue the ReplicationController that manages it and update its expectations. -func (rm *ReplicationManager) addPod(obj interface{}) { - pod := obj.(*v1.Pod) - - if pod.DeletionTimestamp != nil { - // on a restart of the controller manager, it's possible a new pod shows up in a state that - // is already pending deletion. Prevent the pod from being a creation observation. - rm.deletePod(pod) - return - } - - // If it has a ControllerRef, that's all that matters. - if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { - rc := rm.resolveControllerRef(pod.Namespace, controllerRef) - if rc == nil { - return - } - rsKey, err := controller.KeyFunc(rc) - if err != nil { - return - } - glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) - rm.expectations.CreationObserved(rsKey) - rm.enqueueController(rc) - return - } - - // Otherwise, it's an orphan. Get a list of all matching ReplicationControllers and sync - // them to see if anyone wants to adopt it. - // DO NOT observe creation because no controller should be waiting for an - // orphan. - rcs := rm.getPodControllers(pod) - if len(rcs) == 0 { - return - } - glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) - for _, rc := range rcs { - rm.enqueueController(rc) + return &ReplicationManager{ + *replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas, + v1.SchemeGroupVersion.WithKind("ReplicationController"), + "replication_controller", + "replicationmanager", + podControlAdapter{controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}), + }}, + ), } } - -// When a pod is updated, figure out what ReplicationController/s manage it and wake them -// up. If the labels of the pod have changed we need to awaken both the old -// and new ReplicationController. old and cur must be *v1.Pod types. -func (rm *ReplicationManager) updatePod(old, cur interface{}) { - curPod := cur.(*v1.Pod) - oldPod := old.(*v1.Pod) - if curPod.ResourceVersion == oldPod.ResourceVersion { - // Periodic resync will send update events for all known pods. - // Two different versions of the same pod will always have different RVs. - return - } - - labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) - if curPod.DeletionTimestamp != nil { - // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, - // and after such time has passed, the kubelet actually deletes it from the store. We receive an update - // for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait - // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because - // an rc never initiates a phase change, and so is never asleep waiting for the same. - rm.deletePod(curPod) - if labelChanged { - // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset. - rm.deletePod(oldPod) - } - return - } - - curControllerRef := metav1.GetControllerOf(curPod) - oldControllerRef := metav1.GetControllerOf(oldPod) - controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) - if controllerRefChanged && oldControllerRef != nil { - // The ControllerRef was changed. Sync the old controller, if any. - if rc := rm.resolveControllerRef(oldPod.Namespace, oldControllerRef); rc != nil { - rm.enqueueController(rc) - } - } - - // If it has a ControllerRef, that's all that matters. - if curControllerRef != nil { - rc := rm.resolveControllerRef(curPod.Namespace, curControllerRef) - if rc == nil { - return - } - glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) - rm.enqueueController(rc) - // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in - // the Pod status which in turn will trigger a requeue of the owning ReplicationController thus - // having its status updated with the newly available replica. For now, we can fake the - // update by resyncing the controller MinReadySeconds after the it is requeued because - // a Pod transitioned to Ready. - // Note that this still suffers from #29229, we are just moving the problem one level - // "closer" to kubelet (from the deployment to the ReplicationController controller). - if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rc.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", rc.Name, rc.Spec.MinReadySeconds) - // Add a second to avoid milliseconds skew in AddAfter. - // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. - rm.enqueueControllerAfter(rc, (time.Duration(rc.Spec.MinReadySeconds)*time.Second)+time.Second) - } - return - } - - // Otherwise, it's an orphan. If anything changed, sync matching controllers - // to see if anyone wants to adopt it now. - if labelChanged || controllerRefChanged { - rcs := rm.getPodControllers(curPod) - if len(rcs) == 0 { - return - } - glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) - for _, rc := range rcs { - rm.enqueueController(rc) - } - } -} - -// When a pod is deleted, enqueue the ReplicationController that manages the pod and update its expectations. -// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. -func (rm *ReplicationManager) deletePod(obj interface{}) { - pod, ok := obj.(*v1.Pod) - - // When a delete is dropped, the relist will notice a pod in the store not - // in the list, leading to the insertion of a tombstone object which contains - // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new ReplicationController will not be woken up till the periodic resync. - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) - return - } - pod, ok = tombstone.Obj.(*v1.Pod) - if !ok { - utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) - return - } - } - - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - // No controller should care about orphans being deleted. - return - } - rc := rm.resolveControllerRef(pod.Namespace, controllerRef) - if rc == nil { - return - } - rsKey, err := controller.KeyFunc(rc) - if err != nil { - return - } - glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) - rm.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) - rm.enqueueController(rc) -} - -// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. -func (rm *ReplicationManager) enqueueController(obj interface{}) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rm.queue.Add(key) -} - -// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. -func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rm.queue.AddAfter(key, after) -} - -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (rm *ReplicationManager) worker() { - for rm.processNextWorkItem() { - } - glog.Infof("replication controller worker shutting down") -} - -func (rm *ReplicationManager) processNextWorkItem() bool { - key, quit := rm.queue.Get() - if quit { - return false - } - defer rm.queue.Done(key) - - err := rm.syncHandler(key.(string)) - if err == nil { - rm.queue.Forget(key) - return true - } - - rm.queue.AddRateLimited(key) - utilruntime.HandleError(err) - return true -} - -// manageReplicas checks and updates replicas for the given replication controller. -// Does NOT modify . -func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error { - diff := len(filteredPods) - int(*(rc.Spec.Replicas)) - rcKey, err := controller.KeyFunc(rc) - if err != nil { - return err - } - if diff == 0 { - return nil - } - - if diff < 0 { - diff *= -1 - if diff > rm.burstReplicas { - diff = rm.burstReplicas - } - // TODO: Track UIDs of creates just like deletes. The problem currently - // is we'd need to wait on the result of a create to record the pod's - // UID, which would require locking *across* the create, which will turn - // into a performance bottleneck. We should generate a UID for the pod - // beforehand and store it via ExpectCreations. - errCh := make(chan error, diff) - rm.expectations.ExpectCreations(rcKey, diff) - var wg sync.WaitGroup - glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) - // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize - // and double with each successful iteration in a kind of "slow start". - // This handles attempts to start large numbers of pods that would - // likely all fail with the same error. For example a project with a - // low quota that attempts to create a large number of pods will be - // prevented from spamming the API service with the pod create requests - // after one of its pods fails. Conveniently, this also prevents the - // event spam that those failures would generate. - for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) { - errorCount := len(errCh) - wg.Add(batchSize) - for i := 0; i < batchSize; i++ { - go func() { - defer wg.Done() - var err error - boolPtr := func(b bool) *bool { return &b } - controllerRef := &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: rc.Name, - UID: rc.UID, - BlockOwnerDeletion: boolPtr(true), - Controller: boolPtr(true), - } - err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) - if err != nil && errors.IsTimeout(err) { - // Pod is created but its initialization has timed out. - // If the initialization is successful eventually, the - // controller will observe the creation via the informer. - // If the initialization fails, or if the pod keeps - // uninitialized for a long time, the informer will not - // receive any update, and the controller will create a new - // pod when the expectation expires. - return - } - if err != nil { - // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) - rm.expectations.CreationObserved(rcKey) - errCh <- err - utilruntime.HandleError(err) - } - }() - } - wg.Wait() - // any skipped pods that we never attempted to start shouldn't be expected. - skippedPods := diff - batchSize - if errorCount < len(errCh) && skippedPods > 0 { - glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for controller %q/%q", skippedPods, rc.Namespace, rc.Name) - for i := 0; i < skippedPods; i++ { - // Decrement the expected number of creates because the informer won't observe this pod - rm.expectations.CreationObserved(rcKey) - } - // The skipped pods will be retried later. The next controller resync will - // retry the slow start process. - break - } - diff -= batchSize - } - - select { - case err := <-errCh: - // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. - if err != nil { - return err - } - default: - } - - return nil - } - - if diff > rm.burstReplicas { - diff = rm.burstReplicas - } - glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) - // No need to sort pods if we are about to delete all of them - if *(rc.Spec.Replicas) != 0 { - // Sort the pods in the order such that not-ready < ready, unscheduled - // < scheduled, and pending < running. This ensures that we delete pods - // in the earlier stages whenever possible. - sort.Sort(controller.ActivePods(filteredPods)) - } - // Snapshot the UIDs (ns/name) of the pods we're expecting to see - // deleted, so we know to record their expectations exactly once either - // when we see it as an update of the deletion timestamp, or as a delete. - // Note that if the labels on a pod/rc change in a way that the pod gets - // orphaned, the rs will only wake up after the expectations have - // expired even if other pods are deleted. - deletedPodKeys := []string{} - for i := 0; i < diff; i++ { - deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) - } - // We use pod namespace/name as a UID to wait for deletions, so if the - // labels on a pod/rc change in a way that the pod gets orphaned, the - // rc will only wake up after the expectation has expired. - errCh := make(chan error, diff) - rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) - var wg sync.WaitGroup - wg.Add(diff) - for i := 0; i < diff; i++ { - go func(ix int) { - defer wg.Done() - if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { - // Decrement the expected number of deletes because the informer won't observe this deletion - podKey := controller.PodKey(filteredPods[ix]) - glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) - rm.expectations.DeletionObserved(rcKey, podKey) - errCh <- err - utilruntime.HandleError(err) - } - }(i) - } - wg.Wait() - - select { - case err := <-errCh: - // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. - if err != nil { - return err - } - default: - } - - return nil - -} - -// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning -// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked -// concurrently with the same key. -func (rm *ReplicationManager) syncReplicationController(key string) error { - trace := utiltrace.New("syncReplicationController: " + key) - defer trace.LogIfLong(250 * time.Millisecond) - - startTime := time.Now() - defer func() { - glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) - }() - - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return err - } - rc, err := rm.rcLister.ReplicationControllers(namespace).Get(name) - if errors.IsNotFound(err) { - glog.Infof("Replication Controller has been deleted %v", key) - rm.expectations.DeleteExpectations(key) - return nil - } - if err != nil { - return err - } - - trace.Step("ReplicationController restored") - rcNeedsSync := rm.expectations.SatisfiedExpectations(key) - trace.Step("Expectations restored") - - // list all pods to include the pods that don't match the rc's selector - // anymore but has the stale controller ref. - // TODO: Do the List and Filter in a single pass, or use an index. - allPods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) - if err != nil { - return err - } - // Ignore inactive pods. - var filteredPods []*v1.Pod - for _, pod := range allPods { - if controller.IsPodActive(pod) { - filteredPods = append(filteredPods, pod) - } - } - // If any adoptions are attempted, we should first recheck for deletion with - // an uncached quorum read sometime after listing Pods (see #42639). - canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := rm.kubeClient.CoreV1().ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - if fresh.UID != rc.UID { - return nil, fmt.Errorf("original ReplicationController %v/%v is gone: got uid %v, wanted %v", rc.Namespace, rc.Name, fresh.UID, rc.UID) - } - return fresh, nil - }) - cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind, canAdoptFunc) - // NOTE: filteredPods are pointing to objects from cache - if you need to - // modify them, you need to copy it first. - filteredPods, err = cm.ClaimPods(filteredPods) - if err != nil { - return err - } - - var manageReplicasErr error - if rcNeedsSync && rc.DeletionTimestamp == nil { - manageReplicasErr = rm.manageReplicas(filteredPods, rc) - } - trace.Step("manageReplicas done") - - rc = rc.DeepCopy() - - newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) - - // Always updates status as pods come up or die. - updatedRC, err := updateReplicationControllerStatus(rm.kubeClient.CoreV1().ReplicationControllers(rc.Namespace), *rc, newStatus) - if err != nil { - // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop - return err - } - // Resync the ReplicationController after MinReadySeconds as a last line of defense to guard against clock-skew. - if manageReplicasErr == nil && updatedRC.Spec.MinReadySeconds > 0 && - updatedRC.Status.ReadyReplicas == *(updatedRC.Spec.Replicas) && - updatedRC.Status.AvailableReplicas != *(updatedRC.Spec.Replicas) { - rm.enqueueControllerAfter(updatedRC, time.Duration(updatedRC.Spec.MinReadySeconds)*time.Second) - } - return manageReplicasErr -} diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go deleted file mode 100644 index 151aa96b93e..00000000000 --- a/pkg/controller/replication/replication_controller_test.go +++ /dev/null @@ -1,1719 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// If you make changes to this file, you should also make the corresponding change in ReplicaSet. - -package replication - -import ( - "errors" - "fmt" - "math/rand" - "net/http/httptest" - "net/url" - "reflect" - "strings" - "testing" - "time" - - "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/informers" - coreinformers "k8s.io/client-go/informers/core/v1" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" - fakeclientset "k8s.io/client-go/kubernetes/fake" - restclient "k8s.io/client-go/rest" - core "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" - utiltesting "k8s.io/client-go/util/testing" - "k8s.io/client-go/util/workqueue" - "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/securitycontext" -) - -var alwaysReady = func() bool { return true } - -func getKey(rc *v1.ReplicationController, t *testing.T) string { - if key, err := controller.KeyFunc(rc); err != nil { - t.Errorf("Unexpected error getting key for rc %v: %v", rc.Name, err) - return "" - } else { - return key - } -} - -func newReplicationController(replicas int) *v1.ReplicationController { - rc := &v1.ReplicationController{ - TypeMeta: metav1.TypeMeta{APIVersion: legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, - ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - Name: "foobar", - Namespace: metav1.NamespaceDefault, - ResourceVersion: "18", - }, - Spec: v1.ReplicationControllerSpec{ - Replicas: func() *int32 { i := int32(replicas); return &i }(), - Selector: map[string]string{"foo": "bar"}, - Template: &v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "name": "foo", - "type": "production", - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Image: "foo/bar", - TerminationMessagePath: v1.TerminationMessagePathDefault, - ImagePullPolicy: v1.PullIfNotPresent, - SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), - }, - }, - RestartPolicy: v1.RestartPolicyAlways, - DNSPolicy: v1.DNSDefault, - NodeSelector: map[string]string{ - "baz": "blah", - }, - }, - }, - }, - } - return rc -} - -// create a pod with the given phase for the given rc (same selectors and namespace). -func newPod(name string, rc *v1.ReplicationController, status v1.PodPhase, lastTransitionTime *metav1.Time, properlyOwned bool) *v1.Pod { - var conditions []v1.PodCondition - if status == v1.PodRunning { - condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} - if lastTransitionTime != nil { - condition.LastTransitionTime = *lastTransitionTime - } - conditions = append(conditions, condition) - } - var controllerReference metav1.OwnerReference - if properlyOwned { - var trueVar = true - controllerReference = metav1.OwnerReference{UID: rc.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rc.Name, Controller: &trueVar} - } - - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: rc.Spec.Selector, - Namespace: rc.Namespace, - OwnerReferences: []metav1.OwnerReference{controllerReference}, - }, - Status: v1.PodStatus{Phase: status, Conditions: conditions}, - } -} - -// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store. -func newPodList(store cache.Store, count int, status v1.PodPhase, rc *v1.ReplicationController, name string) *v1.PodList { - pods := []v1.Pod{} - var trueVar = true - controllerReference := metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar} - for i := 0; i < count; i++ { - pod := newPod(fmt.Sprintf("%s%d", name, i), rc, status, nil, false) - pod.OwnerReferences = []metav1.OwnerReference{controllerReference} - if store != nil { - store.Add(pod) - } - pods = append(pods, *pod) - } - return &v1.PodList{ - Items: pods, - } -} - -// processSync initiates a sync via processNextWorkItem() to test behavior that -// depends on both functions (such as re-queueing on sync error). -func processSync(rm *ReplicationManager, key string) error { - // Save old syncHandler and replace with one that captures the error. - oldSyncHandler := rm.syncHandler - defer func() { - rm.syncHandler = oldSyncHandler - }() - var syncErr error - rm.syncHandler = func(key string) error { - syncErr = oldSyncHandler(key) - return syncErr - } - rm.queue.Add(key) - rm.processNextWorkItem() - return syncErr -} - -func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { - if e, a := expectedCreates, len(fakePodControl.Templates); e != a { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) - } - if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) - } - if e, a := expectedPatches, len(fakePodControl.Patches); e != a { - t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) - } -} - -func replicationControllerResourceName() string { - return "replicationcontrollers" -} - -type serverResponse struct { - statusCode int - obj interface{} -} - -func newReplicationManagerFromClient(kubeClient clientset.Interface, burstReplicas int) (*ReplicationManager, coreinformers.PodInformer, coreinformers.ReplicationControllerInformer) { - informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - podInformer := informerFactory.Core().V1().Pods() - rcInformer := informerFactory.Core().V1().ReplicationControllers() - rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas) - rm.podListerSynced = alwaysReady - rm.rcListerSynced = alwaysReady - return rm, podInformer, rcInformer -} - -func TestSyncReplicationControllerDoesNothing(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // 2 running pods, a controller with 2 replicas, sync is a no-op - controllerSpec := newReplicationController(2) - rcInformer.Informer().GetIndexer().Add(controllerSpec) - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, controllerSpec, "pod") - - manager.podControl = &fakePodControl - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) -} - -func TestSyncReplicationControllerDeletes(t *testing.T) { - controllerSpec := newReplicationController(1) - - c := fake.NewSimpleClientset(controllerSpec) - fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - manager.podControl = &fakePodControl - - // 2 running pods and a controller with 1 replica, one pod delete expected - rcInformer.Informer().GetIndexer().Add(controllerSpec) - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, controllerSpec, "pod") - - err := manager.syncReplicationController(getKey(controllerSpec, t)) - if err != nil { - t.Fatalf("syncReplicationController() error: %v", err) - } - validateSyncReplication(t, &fakePodControl, 0, 1, 0) -} - -func TestDeleteFinalStateUnknown(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - fakePodControl := controller.FakePodControl{} - manager, _, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - manager.podControl = &fakePodControl - - received := make(chan string) - manager.syncHandler = func(key string) error { - received <- key - return nil - } - - // The DeletedFinalStateUnknown object should cause the rc manager to insert - // the controller matching the selectors of the deleted pod into the work queue. - controllerSpec := newReplicationController(1) - rcInformer.Informer().GetIndexer().Add(controllerSpec) - pods := newPodList(nil, 1, v1.PodRunning, controllerSpec, "pod") - manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) - - go manager.worker() - - expected := getKey(controllerSpec, t) - select { - case key := <-received: - if key != expected { - t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") - } -} - -func TestSyncReplicationControllerCreates(t *testing.T) { - rc := newReplicationController(2) - c := fake.NewSimpleClientset(rc) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // A controller with 2 replicas and no active pods in the store. - // Inactive pods should be ignored. 2 creates expected. - rcInformer.Informer().GetIndexer().Add(rc) - failedPod := newPod("failed-pod", rc, v1.PodFailed, nil, true) - deletedPod := newPod("deleted-pod", rc, v1.PodRunning, nil, true) - deletedPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} - podInformer.Informer().GetIndexer().Add(failedPod) - podInformer.Informer().GetIndexer().Add(deletedPod) - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.syncReplicationController(getKey(rc, t)) - validateSyncReplication(t, &fakePodControl, 2, 0, 0) -} - -// Tell the controller to create 100 replicas, but simulate a limit (like a quota limit) -// of 10, and verify that the controller doesn't make 100 create calls per sync pass -func TestSyncReplicationControllerCreateFailures(t *testing.T) { - fakePodControl := controller.FakePodControl{} - fakePodControl.CreateLimit = 10 - - rc := newReplicationController(fakePodControl.CreateLimit * 10) - c := fake.NewSimpleClientset(rc) - manager, _ /*podInformer*/, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - rcInformer.Informer().GetIndexer().Add(rc) - - manager.podControl = &fakePodControl - manager.syncReplicationController(getKey(rc, t)) - validateSyncReplication(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0) - expectedLimit := 0 - for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { - expectedLimit += controller.SlowStartInitialBatchSize << pass - } - if fakePodControl.CreateCallCount > expectedLimit { - t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) - } -} - -func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { - // Setup a fake server to listen for requests, and run the rc manager in steady state - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "", - SkipRequestFn: func(verb string, url url.URL) bool { - if verb == "GET" { - // Ignore refetch to check DeletionTimestamp. - return true - } - return false - }, - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // Steady state for the replication controller, no Status.Replicas updates expected - activePods := 5 - rc := newReplicationController(activePods) - rcInformer.Informer().GetIndexer().Add(rc) - rc.Status = v1.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)} - newPodList(podInformer.Informer().GetIndexer(), activePods, v1.PodRunning, rc, "pod") - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.syncReplicationController(getKey(rc, t)) - - validateSyncReplication(t, &fakePodControl, 0, 0, 0) - if fakeHandler.RequestReceived != nil { - t.Errorf("Unexpected update when pods and rcs are in a steady state") - } - - // This response body is just so we don't err out decoding the http response, all - // we care about is the request body sent below. - response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) - fakeHandler.ResponseBody = response - - rc.Generation = rc.Generation + 1 - manager.syncReplicationController(getKey(rc, t)) - - rc.Status.ObservedGeneration = rc.Generation - updatedRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc) - fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &updatedRc) -} - -func TestControllerUpdateReplicas(t *testing.T) { - // This is a happy server just to record the PUT request we expect for status.Replicas - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "", - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // Insufficient number of pods in the system, and Status.Replicas is wrong; - // Status.Replica should update to match number of pods in system, 1 new pod should be created. - rc := newReplicationController(5) - rcInformer.Informer().GetIndexer().Add(rc) - rc.Status = v1.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0} - rc.Generation = 1 - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, rc, "pod") - rcCopy := *rc - extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} - rcCopy.Spec.Selector = extraLabelMap - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, &rcCopy, "podWithExtraLabel") - - // This response body is just so we don't err out decoding the http response - response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) - fakeHandler.ResponseBody = response - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - - manager.syncReplicationController(getKey(rc, t)) - - // 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod. - // 2. Status.FullyLabeledReplicas should equal to the number of pods that - // has the extra labels, i.e., 2. - // 3. Every update to the status should include the Generation of the spec. - rc.Status = v1.ReplicationControllerStatus{Replicas: 4, ReadyReplicas: 4, AvailableReplicas: 4, ObservedGeneration: 1} - - decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc) - fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) -} - -func TestSyncReplicationControllerDormancy(t *testing.T) { - controllerSpec := newReplicationController(2) - c := fake.NewSimpleClientset(controllerSpec) - fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - manager.podControl = &fakePodControl - - rcInformer.Informer().GetIndexer().Add(controllerSpec) - newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, controllerSpec, "pod") - - // Creates a replica and sets expectations - controllerSpec.Status.Replicas = 1 - controllerSpec.Status.ReadyReplicas = 1 - controllerSpec.Status.AvailableReplicas = 1 - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) - - // Expectations prevents replicas but not an update on status - controllerSpec.Status.Replicas = 0 - controllerSpec.Status.ReadyReplicas = 0 - controllerSpec.Status.AvailableReplicas = 0 - fakePodControl.Clear() - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) - - // Get the key for the controller - rcKey, err := controller.KeyFunc(controllerSpec) - if err != nil { - t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err) - } - - // Lowering expectations should lead to a sync that creates a replica, however the - // fakePodControl error will prevent this, leaving expectations at 0, 0. - manager.expectations.CreationObserved(rcKey) - controllerSpec.Status.Replicas = 1 - controllerSpec.Status.ReadyReplicas = 1 - controllerSpec.Status.AvailableReplicas = 1 - fakePodControl.Clear() - fakePodControl.Err = fmt.Errorf("Fake Error") - - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) - - // This replica should not need a Lowering of expectations, since the previous create failed - fakePodControl.Clear() - fakePodControl.Err = nil - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) -} - -func TestPodControllerLookup(t *testing.T) { - manager, _, rcInformer := newReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), BurstReplicas) - testCases := []struct { - inRCs []*v1.ReplicationController - pod *v1.Pod - outRCName string - }{ - // pods without labels don't match any rcs - { - inRCs: []*v1.ReplicationController{ - {ObjectMeta: metav1.ObjectMeta{Name: "basic"}}}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll}}, - outRCName: "", - }, - // Matching labels, not namespace - { - inRCs: []*v1.ReplicationController{ - { - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: v1.ReplicationControllerSpec{ - Selector: map[string]string{"foo": "bar"}, - }, - }, - }, - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, - outRCName: "", - }, - // Matching ns and labels returns the key to the rc, not the rc name - { - inRCs: []*v1.ReplicationController{ - { - ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"}, - Spec: v1.ReplicationControllerSpec{ - Selector: map[string]string{"foo": "bar"}, - }, - }, - }, - pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, - outRCName: "bar", - }, - } - for _, c := range testCases { - for _, r := range c.inRCs { - rcInformer.Informer().GetIndexer().Add(r) - } - if rcs := manager.getPodControllers(c.pod); rcs != nil { - if len(rcs) != 1 { - t.Errorf("len(rcs) = %v, want %v", len(rcs), 1) - continue - } - rc := rcs[0] - if c.outRCName != rc.Name { - t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName) - } - } else if c.outRCName != "" { - t.Errorf("Expected a controller %v pod %v, found none", c.outRCName, c.pod.Name) - } - } -} - -func TestWatchControllers(t *testing.T) { - fakeWatch := watch.NewFake() - c := &fake.Clientset{} - c.AddWatchReactor("replicationcontrollers", core.DefaultWatchReactor(fakeWatch, nil)) - stopCh := make(chan struct{}) - defer close(stopCh) - informers := informers.NewSharedInformerFactory(c, controller.NoResyncPeriodFunc()) - podInformer := informers.Core().V1().Pods() - rcInformer := informers.Core().V1().ReplicationControllers() - manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas) - informers.Start(stopCh) - - var testControllerSpec v1.ReplicationController - received := make(chan string) - - // The update sent through the fakeWatcher should make its way into the workqueue, - // and eventually into the syncHandler. The handler validates the received controller - // and closes the received channel to indicate that the test can finish. - manager.syncHandler = func(key string) error { - obj, exists, err := rcInformer.Informer().GetIndexer().GetByKey(key) - if !exists || err != nil { - t.Errorf("Expected to find controller under key %v", key) - } - controllerSpec := *obj.(*v1.ReplicationController) - if !apiequality.Semantic.DeepDerivative(controllerSpec, testControllerSpec) { - t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) - } - close(received) - return nil - } - - // Start only the rc watcher and the workqueue, send a watch event, - // and make sure it hits the sync method. - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) - - testControllerSpec.Name = "foo" - fakeWatch.Add(&testControllerSpec) - - select { - case <-received: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("unexpected timeout from result channel") - } -} - -func TestWatchPods(t *testing.T) { - fakeWatch := watch.NewFake() - c := &fake.Clientset{} - c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // Put one rc and one pod into the controller's stores - testControllerSpec := newReplicationController(1) - rcInformer.Informer().GetIndexer().Add(testControllerSpec) - received := make(chan string) - // The pod update sent through the fakeWatcher should figure out the managing rc and - // send it into the syncHandler. - manager.syncHandler = func(key string) error { - - obj, exists, err := rcInformer.Informer().GetIndexer().GetByKey(key) - if !exists || err != nil { - t.Errorf("Expected to find controller under key %v", key) - } - controllerSpec := obj.(*v1.ReplicationController) - if !apiequality.Semantic.DeepDerivative(controllerSpec, testControllerSpec) { - t.Errorf("\nExpected %#v,\nbut got %#v", testControllerSpec, controllerSpec) - } - close(received) - return nil - } - // Start only the pod watcher and the workqueue, send a watch event, - // and make sure it hits the sync method for the right rc. - stopCh := make(chan struct{}) - defer close(stopCh) - go podInformer.Informer().Run(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) - - pods := newPodList(nil, 1, v1.PodRunning, testControllerSpec, "pod") - testPod := pods.Items[0] - testPod.Status.Phase = v1.PodFailed - fakeWatch.Add(&testPod) - - select { - case <-received: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("unexpected timeout from result channel") - } -} - -func TestUpdatePods(t *testing.T) { - manager, podInformer, rcInformer := newReplicationManagerFromClient(fake.NewSimpleClientset(), BurstReplicas) - - received := make(chan string) - - manager.syncHandler = func(key string) error { - obj, exists, err := rcInformer.Informer().GetIndexer().GetByKey(key) - if !exists || err != nil { - t.Errorf("Expected to find controller under key %v", key) - } - received <- obj.(*v1.ReplicationController).Name - return nil - } - - stopCh := make(chan struct{}) - defer close(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) - - // Put 2 rcs and one pod into the controller's stores - labelMap1 := map[string]string{"foo": "bar"} - testControllerSpec1 := newReplicationController(1) - testControllerSpec1.Spec.Selector = labelMap1 - rcInformer.Informer().GetIndexer().Add(testControllerSpec1) - labelMap2 := map[string]string{"bar": "foo"} - testControllerSpec2 := *testControllerSpec1 - testControllerSpec2.Spec.Selector = labelMap2 - testControllerSpec2.Name = "barfoo" - rcInformer.Informer().GetIndexer().Add(&testControllerSpec2) - - isController := true - controllerRef1 := metav1.OwnerReference{UID: testControllerSpec1.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec1.Name, Controller: &isController} - controllerRef2 := metav1.OwnerReference{UID: testControllerSpec2.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec2.Name, Controller: &isController} - - // case 1: Pod with a ControllerRef - pod1 := newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] - pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} - pod1.ResourceVersion = "1" - pod2 := pod1 - pod2.Labels = labelMap2 - pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) - expected := sets.NewString(testControllerSpec1.Name) - for _, name := range expected.List() { - t.Logf("Expecting update for %+v", name) - select { - case got := <-received: - if !expected.Has(got) { - t.Errorf("Expected keys %#v got %v", expected, got) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for ReplicationControllers") - } - } - - // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RC. - pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] - pod1.ResourceVersion = "1" - pod1.Labels = labelMap2 - pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} - pod2 = pod1 - pod2.OwnerReferences = nil - pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testControllerSpec2.Name) - for _, name := range expected.List() { - t.Logf("Expecting update for %+v", name) - select { - case got := <-received: - if !expected.Has(got) { - t.Errorf("Expected keys %#v got %v", expected, got) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for ReplicationControllers") - } - } - - // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and - // any label-matching RC. - pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] - pod1.ResourceVersion = "1" - pod1.Labels = labelMap2 - pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} - pod2 = pod1 - pod2.OwnerReferences = nil - pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name) - for _, name := range expected.List() { - t.Logf("Expecting update for %+v", name) - select { - case got := <-received: - if !expected.Has(got) { - t.Errorf("Expected keys %#v got %v", expected, got) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for ReplicationControllers") - } - } - - // case 4: Keep ControllerRef, change labels. Expect to sync owning RC. - pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] - pod1.ResourceVersion = "1" - pod1.Labels = labelMap1 - pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} - pod2 = pod1 - pod2.Labels = labelMap2 - pod2.ResourceVersion = "2" - manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testControllerSpec2.Name) - for _, name := range expected.List() { - t.Logf("Expecting update for %+v", name) - select { - case got := <-received: - if !expected.Has(got) { - t.Errorf("Expected keys %#v got %v", expected, got) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for ReplicationControllers") - } - } -} - -func TestControllerUpdateRequeue(t *testing.T) { - // This server should force a requeue of the controller because it fails to update status.Replicas. - rc := newReplicationController(1) - c := fake.NewSimpleClientset(rc) - c.PrependReactor("update", "replicationcontrollers", - func(action core.Action) (bool, runtime.Object, error) { - if action.GetSubresource() != "status" { - return false, nil, nil - } - return true, nil, errors.New("failed to update status") - }) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - rcInformer.Informer().GetIndexer().Add(rc) - rc.Status = v1.ReplicationControllerStatus{Replicas: 2} - newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, rc, "pod") - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - - // Enqueue once. Then process it. Disable rate-limiting for this. - manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) - manager.enqueueController(rc) - manager.processNextWorkItem() - // It should have been requeued. - if got, want := manager.queue.Len(), 1; got != want { - t.Errorf("queue.Len() = %v, want %v", got, want) - } -} - -func TestControllerUpdateStatusWithFailure(t *testing.T) { - rc := newReplicationController(1) - c := &fake.Clientset{} - c.AddReactor("get", "replicationcontrollers", func(action core.Action) (bool, runtime.Object, error) { - return true, rc, nil - }) - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - return true, &v1.ReplicationController{}, fmt.Errorf("Fake error") - }) - fakeRCClient := c.Core().ReplicationControllers("default") - numReplicas := int32(10) - status := v1.ReplicationControllerStatus{Replicas: numReplicas} - updateReplicationControllerStatus(fakeRCClient, *rc, status) - updates, gets := 0, 0 - for _, a := range c.Actions() { - if a.GetResource().Resource != "replicationcontrollers" { - t.Errorf("Unexpected action %+v", a) - continue - } - - switch action := a.(type) { - case core.GetAction: - gets++ - // Make sure the get is for the right rc even though the update failed. - if action.GetName() != rc.Name { - t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, action.GetName()) - } - case core.UpdateAction: - updates++ - // Confirm that the update has the right status.Replicas even though the Get - // returned an rc with replicas=1. - if c, ok := action.GetObject().(*v1.ReplicationController); !ok { - t.Errorf("Expected an rc as the argument to update, got %T", c) - } else if c.Status.Replicas != numReplicas { - t.Errorf("Expected update for rc to contain replicas %v, got %v instead", - numReplicas, c.Status.Replicas) - } - default: - t.Errorf("Unexpected action %+v", a) - break - } - } - if gets != 1 || updates != 2 { - t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) - } -} - -// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. -func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { - controllerSpec := newReplicationController(numReplicas) - c := fake.NewSimpleClientset(controllerSpec) - fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, burstReplicas) - manager.podControl = &fakePodControl - - rcInformer.Informer().GetIndexer().Add(controllerSpec) - - expectedPods := 0 - pods := newPodList(nil, numReplicas, v1.PodPending, controllerSpec, "pod") - - rcKey, err := controller.KeyFunc(controllerSpec) - if err != nil { - t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err) - } - - // Size up the controller, then size it down, and confirm the expected create/delete pattern - for _, replicas := range []int{numReplicas, 0} { - - *(controllerSpec.Spec.Replicas) = int32(replicas) - rcInformer.Informer().GetIndexer().Add(controllerSpec) - - for i := 0; i < numReplicas; i += burstReplicas { - manager.syncReplicationController(getKey(controllerSpec, t)) - - // The store accrues active pods. It's also used by the rc to determine how many - // replicas to create. - activePods := len(podInformer.Informer().GetIndexer().List()) - if replicas != 0 { - // This is the number of pods currently "in flight". They were created by the rc manager above, - // which then puts the rc to sleep till all of them have been observed. - expectedPods = replicas - activePods - if expectedPods > burstReplicas { - expectedPods = burstReplicas - } - // This validates the rc manager sync actually created pods - validateSyncReplication(t, &fakePodControl, expectedPods, 0, 0) - - // This simulates the watch events for all but 1 of the expected pods. - // None of these should wake the controller because it has expectations==BurstReplicas. - for i := 0; i < expectedPods-1; i++ { - podInformer.Informer().GetIndexer().Add(&pods.Items[i]) - manager.addPod(&pods.Items[i]) - } - - podExp, exists, err := manager.expectations.GetExpectations(rcKey) - if !exists || err != nil { - t.Fatalf("Did not find expectations for rc.") - } - if add, _ := podExp.GetExpectations(); add != 1 { - t.Fatalf("Expectations are wrong %v", podExp) - } - } else { - expectedPods = (replicas - activePods) * -1 - if expectedPods > burstReplicas { - expectedPods = burstReplicas - } - validateSyncReplication(t, &fakePodControl, 0, expectedPods, 0) - - // To accurately simulate a watch we must delete the exact pods - // the rc is waiting for. - expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t)) - podsToDelete := []*v1.Pod{} - isController := true - for _, key := range expectedDels.List() { - nsName := strings.Split(key, "/") - podsToDelete = append(podsToDelete, &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: nsName[1], - Namespace: nsName[0], - Labels: controllerSpec.Spec.Selector, - OwnerReferences: []metav1.OwnerReference{ - {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, - }, - }, - }) - } - // Don't delete all pods because we confirm that the last pod - // has exactly one expectation at the end, to verify that we - // don't double delete. - for i := range podsToDelete[1:] { - podInformer.Informer().GetIndexer().Delete(podsToDelete[i]) - manager.deletePod(podsToDelete[i]) - } - podExp, exists, err := manager.expectations.GetExpectations(rcKey) - if !exists || err != nil { - t.Fatalf("Did not find expectations for rc.") - } - if _, del := podExp.GetExpectations(); del != 1 { - t.Fatalf("Expectations are wrong %v", podExp) - } - } - - // Check that the rc didn't take any action for all the above pods - fakePodControl.Clear() - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) - - // Create/Delete the last pod - // The last add pod will decrease the expectation of the rc to 0, - // which will cause it to create/delete the remaining replicas up to burstReplicas. - if replicas != 0 { - podInformer.Informer().GetIndexer().Add(&pods.Items[expectedPods-1]) - manager.addPod(&pods.Items[expectedPods-1]) - } else { - expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t)) - if expectedDel.Len() != 1 { - t.Fatalf("Waiting on unexpected number of deletes.") - } - nsName := strings.Split(expectedDel.List()[0], "/") - isController := true - lastPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: nsName[1], - Namespace: nsName[0], - Labels: controllerSpec.Spec.Selector, - OwnerReferences: []metav1.OwnerReference{ - {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, - }, - }, - } - podInformer.Informer().GetIndexer().Delete(lastPod) - manager.deletePod(lastPod) - } - pods.Items = pods.Items[expectedPods:] - } - - // Confirm that we've created the right number of replicas - activePods := int32(len(podInformer.Informer().GetIndexer().List())) - if activePods != *(controllerSpec.Spec.Replicas) { - t.Fatalf("Unexpected number of active pods, expected %d, got %d", *(controllerSpec.Spec.Replicas), activePods) - } - // Replenish the pod list, since we cut it down sizing up - pods = newPodList(nil, replicas, v1.PodRunning, controllerSpec, "pod") - } -} - -func TestControllerBurstReplicas(t *testing.T) { - doTestControllerBurstReplicas(t, 5, 30) - doTestControllerBurstReplicas(t, 5, 12) - doTestControllerBurstReplicas(t, 3, 2) -} - -type FakeRCExpectations struct { - *controller.ControllerExpectations - satisfied bool - expSatisfied func() -} - -func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { - fe.expSatisfied() - return fe.satisfied -} - -// TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods -// and checking expectations. -func TestRCSyncExpectations(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 2) - manager.podControl = &fakePodControl - - controllerSpec := newReplicationController(2) - rcInformer.Informer().GetIndexer().Add(controllerSpec) - pods := newPodList(nil, 2, v1.PodPending, controllerSpec, "pod") - podInformer.Informer().GetIndexer().Add(&pods.Items[0]) - postExpectationsPod := pods.Items[1] - - manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{ - controller.NewControllerExpectations(), true, func() { - // If we check active pods before checking expectataions, the rc - // will create a new replica because it doesn't see this pod, but - // has fulfilled its expectations. - podInformer.Informer().GetIndexer().Add(&postExpectationsPod) - }, - }) - manager.syncReplicationController(getKey(controllerSpec, t)) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) -} - -func TestDeleteControllerAndExpectations(t *testing.T) { - rc := newReplicationController(1) - c := fake.NewSimpleClientset(rc) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 10) - - rcInformer.Informer().GetIndexer().Add(rc) - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - - // This should set expectations for the rc - manager.syncReplicationController(getKey(rc, t)) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) - fakePodControl.Clear() - - // Get the RC key - rcKey, err := controller.KeyFunc(rc) - if err != nil { - t.Errorf("Couldn't get key for object %#v: %v", rc, err) - } - - // This is to simulate a concurrent addPod, that has a handle on the expectations - // as the controller deletes it. - podExp, exists, err := manager.expectations.GetExpectations(rcKey) - if !exists || err != nil { - t.Errorf("No expectations found for rc") - } - rcInformer.Informer().GetIndexer().Delete(rc) - manager.syncReplicationController(getKey(rc, t)) - - if _, exists, err = manager.expectations.GetExpectations(rcKey); exists { - t.Errorf("Found expectaions, expected none since the rc has been deleted.") - } - - // This should have no effect, since we've deleted the rc. - podExp.Add(-1, 0) - podInformer.Informer().GetIndexer().Replace(make([]interface{}, 0), "0") - manager.syncReplicationController(getKey(rc, t)) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) -} - -// shuffle returns a new shuffled list of container controllers. -func shuffle(controllers []*v1.ReplicationController) []*v1.ReplicationController { - numControllers := len(controllers) - randIndexes := rand.Perm(numControllers) - shuffled := make([]*v1.ReplicationController, numControllers) - for i := 0; i < numControllers; i++ { - shuffled[i] = controllers[randIndexes[i]] - } - return shuffled -} - -func TestOverlappingRCs(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - - manager, _, rcInformer := newReplicationManagerFromClient(c, 10) - - // Create 10 rcs, shuffled them randomly and insert them into the - // rc manager's store. - // All use the same CreationTimestamp since ControllerRef should be able - // to handle that. - var controllers []*v1.ReplicationController - timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) - for j := 1; j < 10; j++ { - controllerSpec := newReplicationController(1) - controllerSpec.CreationTimestamp = timestamp - controllerSpec.Name = fmt.Sprintf("rc%d", j) - controllers = append(controllers, controllerSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - rcInformer.Informer().GetIndexer().Add(shuffledControllers[j]) - } - // Add a pod with a ControllerRef and make sure only the corresponding - // ReplicationController is synced. Pick a RC in the middle since the old code - // used to sort by name if all timestamps were equal. - rc := controllers[3] - pods := newPodList(nil, 1, v1.PodPending, rc, "pod") - pod := &pods.Items[0] - isController := true - pod.OwnerReferences = []metav1.OwnerReference{ - {UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &isController}, - } - rcKey := getKey(rc, t) - - manager.addPod(pod) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } -} - -func TestDeletionTimestamp(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := newReplicationManagerFromClient(c, 10) - - controllerSpec := newReplicationController(1) - rcInformer.Informer().GetIndexer().Add(controllerSpec) - rcKey, err := controller.KeyFunc(controllerSpec) - if err != nil { - t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err) - } - pod := newPodList(nil, 1, v1.PodPending, controllerSpec, "pod").Items[0] - pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} - pod.ResourceVersion = "1" - manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)}) - - // A pod added with a deletion timestamp should decrement deletions, not creations. - manager.addPod(&pod) - - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - manager.queue.Done(rcKey) - - podExp, exists, err := manager.expectations.GetExpectations(rcKey) - if !exists || err != nil || !podExp.Fulfilled() { - t.Fatalf("Wrong expectations %#v", podExp) - } - - // An update from no deletion timestamp to having one should be treated - // as a deletion. - oldPod := newPodList(nil, 1, v1.PodPending, controllerSpec, "pod").Items[0] - oldPod.ResourceVersion = "2" - manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)}) - manager.updatePod(&oldPod, &pod) - - queueRC, _ = manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - manager.queue.Done(rcKey) - - podExp, exists, err = manager.expectations.GetExpectations(rcKey) - if !exists || err != nil || !podExp.Fulfilled() { - t.Fatalf("Wrong expectations %#v", podExp) - } - - // An update to the pod (including an update to the deletion timestamp) - // should not be counted as a second delete. - isController := true - secondPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: pod.Namespace, - Name: "secondPod", - Labels: pod.Labels, - OwnerReferences: []metav1.OwnerReference{ - {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, - }, - }, - } - manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)}) - oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} - oldPod.ResourceVersion = "2" - manager.updatePod(&oldPod, &pod) - - podExp, exists, err = manager.expectations.GetExpectations(rcKey) - if !exists || err != nil || podExp.Fulfilled() { - t.Fatalf("Wrong expectations %#v", podExp) - } - - // A pod with a non-nil deletion timestamp should also be ignored by the - // delete handler, because it's already been counted in the update. - manager.deletePod(&pod) - podExp, exists, err = manager.expectations.GetExpectations(rcKey) - if !exists || err != nil || podExp.Fulfilled() { - t.Fatalf("Wrong expectations %#v", podExp) - } - - // Deleting the second pod should clear expectations. - manager.deletePod(secondPod) - - queueRC, _ = manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - manager.queue.Done(rcKey) - - podExp, exists, err = manager.expectations.GetExpectations(rcKey) - if !exists || err != nil || !podExp.Fulfilled() { - t.Fatalf("Wrong expectations %#v", podExp) - } -} - -func BenchmarkGetPodControllerMultiNS(b *testing.B) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) - - const nsNum = 1000 - - pods := []v1.Pod{} - for i := 0; i < nsNum; i++ { - ns := fmt.Sprintf("ns-%d", i) - for j := 0; j < 10; j++ { - rcName := fmt.Sprintf("rc-%d", j) - for k := 0; k < 10; k++ { - podName := fmt.Sprintf("pod-%d-%d", j, k) - pods = append(pods, v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Namespace: ns, - Labels: map[string]string{"rcName": rcName}, - }, - }) - } - } - } - - for i := 0; i < nsNum; i++ { - ns := fmt.Sprintf("ns-%d", i) - for j := 0; j < 10; j++ { - rcName := fmt.Sprintf("rc-%d", j) - rcInformer.Informer().GetIndexer().Add(&v1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: rcName, Namespace: ns}, - Spec: v1.ReplicationControllerSpec{ - Selector: map[string]string{"rcName": rcName}, - }, - }) - } - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - for _, pod := range pods { - manager.getPodControllers(&pod) - } - } -} - -func BenchmarkGetPodControllerSingleNS(b *testing.B) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) - - const rcNum = 1000 - const replicaNum = 3 - - pods := []v1.Pod{} - for i := 0; i < rcNum; i++ { - rcName := fmt.Sprintf("rc-%d", i) - for j := 0; j < replicaNum; j++ { - podName := fmt.Sprintf("pod-%d-%d", i, j) - pods = append(pods, v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Namespace: "foo", - Labels: map[string]string{"rcName": rcName}, - }, - }) - } - } - - for i := 0; i < rcNum; i++ { - rcName := fmt.Sprintf("rc-%d", i) - rcInformer.Informer().GetIndexer().Add(&v1.ReplicationController{ - ObjectMeta: metav1.ObjectMeta{Name: rcName, Namespace: "foo"}, - Spec: v1.ReplicationControllerSpec{ - Selector: map[string]string{"rcName": rcName}, - }, - }) - } - b.ResetTimer() - - for i := 0; i < b.N; i++ { - for _, pod := range pods { - manager.getPodControllers(&pod) - } - } -} - -// setupManagerWithGCEnabled creates a RC manager with a fakePodControl -func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl, podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer) { - c := fakeclientset.NewSimpleClientset(objs...) - fakePodControl = &controller.FakePodControl{} - manager, podInformer, rcInformer = newReplicationManagerFromClient(c, BurstReplicas) - manager.podControl = fakePodControl - return manager, fakePodControl, podInformer, rcInformer -} - -func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - var trueVar = true - otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar} - // add to podLister a matching Pod controlled by another controller. Expect no patch. - pod := newPod("pod", rc, v1.PodRunning, nil, false) - pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference} - podInformer.Informer().GetIndexer().Add(pod) - err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) - } - // because the matching pod already has a controller, so 2 pods should be created. - validateSyncReplication(t, fakePodControl, 2, 0, 0) -} - -func TestPatchPodWithOtherOwnerRef(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - // add to podLister one more matching pod that doesn't have a controller - // ref, but has an owner ref pointing to other object. Expect a patch to - // take control of it. - unrelatedOwnerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} - pod := newPod("pod", rc, v1.PodRunning, nil, false) - pod.OwnerReferences = []metav1.OwnerReference{unrelatedOwnerReference} - podInformer.Informer().GetIndexer().Add(pod) - - err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) - } - // 1 patch to take control of pod, and 1 create of new pod. - validateSyncReplication(t, fakePodControl, 1, 0, 1) -} - -func TestPatchPodWithCorrectOwnerRef(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - // add to podLister a matching pod that has an ownerRef pointing to the rc, - // but ownerRef.Controller is false. Expect a patch to take control it. - rcOwnerReference := metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name} - pod := newPod("pod", rc, v1.PodRunning, nil, false) - pod.OwnerReferences = []metav1.OwnerReference{rcOwnerReference} - podInformer.Informer().GetIndexer().Add(pod) - - err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) - } - // 1 patch to take control of pod, and 1 create of new pod. - validateSyncReplication(t, fakePodControl, 1, 0, 1) -} - -func TestPatchPodFails(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - // add to podLister two matching pods. Expect two patches to take control - // them. - podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil, false)) - podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil, false)) - // let both patches fail. The rc manager will assume it fails to take - // control of the pods and requeue to try again. - fakePodControl.Err = fmt.Errorf("Fake Error") - rcKey := getKey(rc, t) - err := processSync(manager, rcKey) - if err == nil || !strings.Contains(err.Error(), "Fake Error") { - t.Fatalf("expected Fake Error, got %v", err) - } - // 2 patches to take control of pod1 and pod2 (both fail). - validateSyncReplication(t, fakePodControl, 0, 0, 2) - // RC should requeue itself. - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } -} - -func TestPatchExtraPodsThenDelete(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - // add to podLister three matching pods. Expect three patches to take control - // them, and later delete one of them. - podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil, false)) - podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil, false)) - podInformer.Informer().GetIndexer().Add(newPod("pod3", rc, v1.PodRunning, nil, false)) - err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) - } - // 3 patches to take control of the pods, and 1 deletion because there is an extra pod. - validateSyncReplication(t, fakePodControl, 0, 1, 3) -} - -func TestUpdateLabelsRemoveControllerRef(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - // put one pod in the podLister - pod := newPod("pod", rc, v1.PodRunning, nil, false) - pod.ResourceVersion = "1" - var trueVar = true - rcOwnerReference := metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar} - pod.OwnerReferences = []metav1.OwnerReference{rcOwnerReference} - updatedPod := *pod - // reset the labels - updatedPod.Labels = make(map[string]string) - updatedPod.ResourceVersion = "2" - // add the updatedPod to the store. This is consistent with the behavior of - // the Informer: Informer updates the store before call the handler - // (updatePod() in this case). - podInformer.Informer().GetIndexer().Add(&updatedPod) - // send a update of the same pod with modified labels - manager.updatePod(pod, &updatedPod) - // verifies that rc is added to the queue - rcKey := getKey(rc, t) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - manager.queue.Done(queueRC) - err := manager.syncReplicationController(rcKey) - if err != nil { - t.Fatal(err) - } - // expect 1 patch to be sent to remove the controllerRef for the pod. - // expect 2 creates because the *(rc.Spec.Replicas)=2 and there exists no - // matching pod. - validateSyncReplication(t, fakePodControl, 2, 0, 1) - fakePodControl.Clear() -} - -func TestUpdateSelectorControllerRef(t *testing.T) { - rc := newReplicationController(2) - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - // put 2 pods in the podLister - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, rc, "pod") - // update the RC so that its selector no longer matches the pods - updatedRC := *rc - updatedRC.Spec.Selector = map[string]string{"foo": "baz"} - // put the updatedRC into the store. This is consistent with the behavior of - // the Informer: Informer updates the store before call the handler - // (updateRC() in this case). - rcInformer.Informer().GetIndexer().Add(&updatedRC) - manager.updateRC(rc, &updatedRC) - // verifies that the rc is added to the queue - rcKey := getKey(rc, t) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - manager.queue.Done(queueRC) - err := manager.syncReplicationController(rcKey) - if err != nil { - t.Fatal(err) - } - // expect 2 patches to be sent to remove the controllerRef for the pods. - // expect 2 creates because the *(rc.Spec.Replicas)=2 and there exists no - // matching pod. - validateSyncReplication(t, fakePodControl, 2, 0, 2) - fakePodControl.Clear() -} - -// RC manager shouldn't adopt or create more pods if the rc is about to be -// deleted. -func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { - rc := newReplicationController(2) - now := metav1.Now() - rc.DeletionTimestamp = &now - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - rcInformer.Informer().GetIndexer().Add(rc) - pod1 := newPod("pod1", rc, v1.PodRunning, nil, false) - podInformer.Informer().GetIndexer().Add(pod1) - - // no patch, no create - err := manager.syncReplicationController(getKey(rc, t)) - if err != nil { - t.Fatal(err) - } - validateSyncReplication(t, fakePodControl, 0, 0, 0) -} - -func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { - // Bare client says it IS deleted. - rc := newReplicationController(2) - now := metav1.Now() - rc.DeletionTimestamp = &now - manager, fakePodControl, podInformer, rcInformer := setupManagerWithGCEnabled(rc) - // Lister (cache) says it's NOT deleted. - rc2 := *rc - rc2.DeletionTimestamp = nil - rcInformer.Informer().GetIndexer().Add(&rc2) - - // Recheck occurs if a matching orphan is present. - pod1 := newPod("pod1", rc, v1.PodRunning, nil, false) - podInformer.Informer().GetIndexer().Add(pod1) - - // sync should abort. - err := manager.syncReplicationController(getKey(rc, t)) - if err == nil { - t.Error("syncReplicationController() err = nil, expected non-nil") - } - // no patch, no create. - validateSyncReplication(t, fakePodControl, 0, 0, 0) -} - -func TestReadyReplicas(t *testing.T) { - // This is a happy server just to record the PUT request we expect for status.Replicas - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // Status.Replica should update to match number of pods in system, 1 new pod should be created. - rc := newReplicationController(2) - rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1} - rc.Generation = 1 - rcInformer.Informer().GetIndexer().Add(rc) - - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodPending, rc, "pod") - newPodList(podInformer.Informer().GetIndexer(), 2, v1.PodRunning, rc, "pod") - - // This response body is just so we don't err out decoding the http response - response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) - fakeHandler.ResponseBody = response - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - - manager.syncReplicationController(getKey(rc, t)) - - // ReadyReplicas should go from 0 to 2. - rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 1} - - decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc) - fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) -} - -func TestAvailableReplicas(t *testing.T) { - // This is a happy server just to record the PUT request we expect for status.Replicas - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - - c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) - - // Status.Replica should update to match number of pods in system, 1 new pod should be created. - rc := newReplicationController(2) - rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1} - rc.Generation = 1 - // minReadySeconds set to 15s - rc.Spec.MinReadySeconds = 15 - rcInformer.Informer().GetIndexer().Add(rc) - - // First pod becomes ready 20s ago - moment := metav1.Time{Time: time.Now().Add(-2e10)} - pod := newPod("pod", rc, v1.PodRunning, &moment, true) - podInformer.Informer().GetIndexer().Add(pod) - - // Second pod becomes ready now - otherMoment := metav1.Now() - otherPod := newPod("otherPod", rc, v1.PodRunning, &otherMoment, true) - podInformer.Informer().GetIndexer().Add(otherPod) - - // This response body is just so we don't err out decoding the http response - response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) - fakeHandler.ResponseBody = response - - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - - // The controller should see only one available pod. - manager.syncReplicationController(getKey(rc, t)) - - rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 1, ObservedGeneration: 1} - - decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc) - fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) -} - -var ( - imagePullBackOff v1.ReplicationControllerConditionType = "ImagePullBackOff" - - condImagePullBackOff = func() v1.ReplicationControllerCondition { - return v1.ReplicationControllerCondition{ - Type: imagePullBackOff, - Status: v1.ConditionTrue, - Reason: "NonExistentImage", - } - } - - condReplicaFailure = func() v1.ReplicationControllerCondition { - return v1.ReplicationControllerCondition{ - Type: v1.ReplicationControllerReplicaFailure, - Status: v1.ConditionTrue, - Reason: "OtherFailure", - } - } - - condReplicaFailure2 = func() v1.ReplicationControllerCondition { - return v1.ReplicationControllerCondition{ - Type: v1.ReplicationControllerReplicaFailure, - Status: v1.ConditionTrue, - Reason: "AnotherFailure", - } - } - - status = func() *v1.ReplicationControllerStatus { - return &v1.ReplicationControllerStatus{ - Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}, - } - } -) - -func TestGetCondition(t *testing.T) { - exampleStatus := status() - - tests := []struct { - name string - - status v1.ReplicationControllerStatus - condType v1.ReplicationControllerConditionType - condStatus v1.ConditionStatus - condReason string - - expected bool - }{ - { - name: "condition exists", - - status: *exampleStatus, - condType: v1.ReplicationControllerReplicaFailure, - - expected: true, - }, - { - name: "condition does not exist", - - status: *exampleStatus, - condType: imagePullBackOff, - - expected: false, - }, - } - - for _, test := range tests { - cond := GetCondition(test.status, test.condType) - exists := cond != nil - if exists != test.expected { - t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists) - } - } -} - -func TestSetCondition(t *testing.T) { - tests := []struct { - name string - - status *v1.ReplicationControllerStatus - cond v1.ReplicationControllerCondition - - expectedStatus *v1.ReplicationControllerStatus - }{ - { - name: "set for the first time", - - status: &v1.ReplicationControllerStatus{}, - cond: condReplicaFailure(), - - expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, - }, - { - name: "simple set", - - status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff()}}, - cond: condReplicaFailure(), - - expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff(), condReplicaFailure()}}, - }, - { - name: "overwrite", - - status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, - cond: condReplicaFailure2(), - - expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure2()}}, - }, - } - - for _, test := range tests { - SetCondition(test.status, test.cond) - if !reflect.DeepEqual(test.status, test.expectedStatus) { - t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) - } - } -} - -func TestRemoveCondition(t *testing.T) { - tests := []struct { - name string - - status *v1.ReplicationControllerStatus - condType v1.ReplicationControllerConditionType - - expectedStatus *v1.ReplicationControllerStatus - }{ - { - name: "remove from empty status", - - status: &v1.ReplicationControllerStatus{}, - condType: v1.ReplicationControllerReplicaFailure, - - expectedStatus: &v1.ReplicationControllerStatus{}, - }, - { - name: "simple remove", - - status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, - condType: v1.ReplicationControllerReplicaFailure, - - expectedStatus: &v1.ReplicationControllerStatus{}, - }, - { - name: "doesn't remove anything", - - status: status(), - condType: imagePullBackOff, - - expectedStatus: status(), - }, - } - - for _, test := range tests { - RemoveCondition(test.status, test.condType) - if !reflect.DeepEqual(test.status, test.expectedStatus) { - t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) - } - } -} diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index 625317becb3..506074b83e4 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -19,123 +19,10 @@ limitations under the License. package replication import ( - "fmt" - "reflect" - - "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" ) -// updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry. -func updateReplicationControllerStatus(c v1core.ReplicationControllerInterface, rc v1.ReplicationController, newStatus v1.ReplicationControllerStatus) (*v1.ReplicationController, error) { - // This is the steady state. It happens when the rc doesn't have any expectations, since - // we do a periodic relist every 30s. If the generations differ but the replicas are - // the same, a caller might've resized to the same replica count. - if rc.Status.Replicas == newStatus.Replicas && - rc.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas && - rc.Status.ReadyReplicas == newStatus.ReadyReplicas && - rc.Status.AvailableReplicas == newStatus.AvailableReplicas && - rc.Generation == rc.Status.ObservedGeneration && - reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) { - return &rc, nil - } - // Save the generation number we acted on, otherwise we might wrongfully indicate - // that we've seen a spec update when we retry. - // TODO: This can clobber an update if we allow multiple agents to write to the - // same status. - newStatus.ObservedGeneration = rc.Generation - - var getErr, updateErr error - var updatedRC *v1.ReplicationController - for i, rc := 0, &rc; ; i++ { - glog.V(4).Infof(fmt.Sprintf("Updating status for rc: %s/%s, ", rc.Namespace, rc.Name) + - fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, *(rc.Spec.Replicas)) + - fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) + - fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) + - fmt.Sprintf("availableReplicas %d->%d, ", rc.Status.AvailableReplicas, newStatus.AvailableReplicas) + - fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration)) - - rc.Status = newStatus - updatedRC, updateErr = c.UpdateStatus(rc) - if updateErr == nil { - return updatedRC, nil - } - // Stop retrying if we exceed statusUpdateRetries - the replicationController will be requeued with a rate limit. - if i >= statusUpdateRetries { - break - } - // Update the controller with the latest resource version for the next poll - if rc, getErr = c.Get(rc.Name, metav1.GetOptions{}); getErr != nil { - // If the GET fails we can't trust status.Replicas anymore. This error - // is bound to be more interesting than the update failure. - return nil, getErr - } - } - - return nil, updateErr -} - -// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type OverlappingControllers []*v1.ReplicationController - -func (o OverlappingControllers) Len() int { return len(o) } -func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } - -func (o OverlappingControllers) Less(i, j int) bool { - if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { - return o[i].Name < o[j].Name - } - return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) -} - -func calculateStatus(rc *v1.ReplicationController, filteredPods []*v1.Pod, manageReplicasErr error) v1.ReplicationControllerStatus { - newStatus := rc.Status - // Count the number of pods that have labels matching the labels of the pod - // template of the replication controller, the matching pods may have more - // labels than are in the template. Because the label of podTemplateSpec is - // a superset of the selector of the replication controller, so the possible - // matching pods must be part of the filteredPods. - fullyLabeledReplicasCount := 0 - readyReplicasCount := 0 - availableReplicasCount := 0 - templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated() - for _, pod := range filteredPods { - if templateLabel.Matches(labels.Set(pod.Labels)) { - fullyLabeledReplicasCount++ - } - if podutil.IsPodReady(pod) { - readyReplicasCount++ - if podutil.IsPodAvailable(pod, rc.Spec.MinReadySeconds, metav1.Now()) { - availableReplicasCount++ - } - } - } - - failureCond := GetCondition(rc.Status, v1.ReplicationControllerReplicaFailure) - if manageReplicasErr != nil && failureCond == nil { - var reason string - if diff := len(filteredPods) - int(*(rc.Spec.Replicas)); diff < 0 { - reason = "FailedCreate" - } else if diff > 0 { - reason = "FailedDelete" - } - cond := NewReplicationControllerCondition(v1.ReplicationControllerReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error()) - SetCondition(&newStatus, cond) - } else if manageReplicasErr == nil && failureCond != nil { - RemoveCondition(&newStatus, v1.ReplicationControllerReplicaFailure) - } - - newStatus.Replicas = int32(len(filteredPods)) - newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount) - newStatus.ReadyReplicas = int32(readyReplicasCount) - newStatus.AvailableReplicas = int32(availableReplicasCount) - return newStatus -} - // NewReplicationControllerCondition creates a new replication controller condition. func NewReplicationControllerCondition(condType v1.ReplicationControllerConditionType, status v1.ConditionStatus, reason, msg string) v1.ReplicationControllerCondition { return v1.ReplicationControllerCondition{ diff --git a/pkg/controller/replication/replication_controller_utils_test.go b/pkg/controller/replication/replication_controller_utils_test.go new file mode 100644 index 00000000000..0acaf0b0b20 --- /dev/null +++ b/pkg/controller/replication/replication_controller_utils_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replication + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" +) + +var ( + imagePullBackOff v1.ReplicationControllerConditionType = "ImagePullBackOff" + + condImagePullBackOff = func() v1.ReplicationControllerCondition { + return v1.ReplicationControllerCondition{ + Type: imagePullBackOff, + Status: v1.ConditionTrue, + Reason: "NonExistentImage", + } + } + + condReplicaFailure = func() v1.ReplicationControllerCondition { + return v1.ReplicationControllerCondition{ + Type: v1.ReplicationControllerReplicaFailure, + Status: v1.ConditionTrue, + Reason: "OtherFailure", + } + } + + condReplicaFailure2 = func() v1.ReplicationControllerCondition { + return v1.ReplicationControllerCondition{ + Type: v1.ReplicationControllerReplicaFailure, + Status: v1.ConditionTrue, + Reason: "AnotherFailure", + } + } + + status = func() *v1.ReplicationControllerStatus { + return &v1.ReplicationControllerStatus{ + Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}, + } + } +) + +func TestGetCondition(t *testing.T) { + exampleStatus := status() + + tests := []struct { + name string + + status v1.ReplicationControllerStatus + condType v1.ReplicationControllerConditionType + condStatus v1.ConditionStatus + condReason string + + expected bool + }{ + { + name: "condition exists", + + status: *exampleStatus, + condType: v1.ReplicationControllerReplicaFailure, + + expected: true, + }, + { + name: "condition does not exist", + + status: *exampleStatus, + condType: imagePullBackOff, + + expected: false, + }, + } + + for _, test := range tests { + cond := GetCondition(test.status, test.condType) + exists := cond != nil + if exists != test.expected { + t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists) + } + } +} + +func TestSetCondition(t *testing.T) { + tests := []struct { + name string + + status *v1.ReplicationControllerStatus + cond v1.ReplicationControllerCondition + + expectedStatus *v1.ReplicationControllerStatus + }{ + { + name: "set for the first time", + + status: &v1.ReplicationControllerStatus{}, + cond: condReplicaFailure(), + + expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, + }, + { + name: "simple set", + + status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff()}}, + cond: condReplicaFailure(), + + expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condImagePullBackOff(), condReplicaFailure()}}, + }, + { + name: "overwrite", + + status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, + cond: condReplicaFailure2(), + + expectedStatus: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure2()}}, + }, + } + + for _, test := range tests { + SetCondition(test.status, test.cond) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +} + +func TestRemoveCondition(t *testing.T) { + tests := []struct { + name string + + status *v1.ReplicationControllerStatus + condType v1.ReplicationControllerConditionType + + expectedStatus *v1.ReplicationControllerStatus + }{ + { + name: "remove from empty status", + + status: &v1.ReplicationControllerStatus{}, + condType: v1.ReplicationControllerReplicaFailure, + + expectedStatus: &v1.ReplicationControllerStatus{}, + }, + { + name: "simple remove", + + status: &v1.ReplicationControllerStatus{Conditions: []v1.ReplicationControllerCondition{condReplicaFailure()}}, + condType: v1.ReplicationControllerReplicaFailure, + + expectedStatus: &v1.ReplicationControllerStatus{}, + }, + { + name: "doesn't remove anything", + + status: status(), + condType: imagePullBackOff, + + expectedStatus: status(), + }, + } + + for _, test := range tests { + RemoveCondition(test.status, test.condType) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +}