From ca13b9e532149b61fc5d08d7bc05902e499a071c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 08:58:28 -0800 Subject: [PATCH] RC/RS: Use ControllerRef to route watch events. This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches This also removes the need for the Pod->Controller mapping cache in RC and RS. This mapping is now persisted in the Pod's ControllerRef instead. --- cmd/kube-controller-manager/app/core.go | 1 - cmd/kube-controller-manager/app/extensions.go | 1 - .../app/options/options.go | 4 +- pkg/controller/replicaset/BUILD | 1 - pkg/controller/replicaset/replica_set.go | 233 +++++++--------- pkg/controller/replicaset/replica_set_test.go | 192 +++++++++---- pkg/controller/replication/BUILD | 1 - .../replication/replication_controller.go | 259 ++++++++---------- .../replication_controller_test.go | 203 +++++++++----- test/integration/framework/master_utils.go | 2 +- test/integration/quota/quota_test.go | 2 - .../integration/replicaset/replicaset_test.go | 1 - .../replicationcontroller_test.go | 2 +- 13 files changed, 486 insertions(+), 416 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index cab11f6d47b..8de3eb9fae0 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -59,7 +59,6 @@ func startReplicationController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().ReplicationControllers(), ctx.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, - int(ctx.Options.LookupCacheSizeForRC), ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index c4ea15401f3..f349ecbe212 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -63,7 +63,6 @@ func startReplicaSetController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, - int(ctx.Options.LookupCacheSizeForRS), ).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index bc7f8e9fde6..196ba2742bd 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -132,8 +132,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load") fs.Int32Var(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load") fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") - fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.") - fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.") + fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") + fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index 0a93111b2e5..57e78d44f4f 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -32,7 +32,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index e165a9e5c56..d7b8f802838 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -59,9 +58,8 @@ const ( statusUpdateRetries = 1 ) -func getRSKind() schema.GroupVersionKind { - return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") -} +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. @@ -90,14 +88,12 @@ type ReplicaSetController struct { // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // Controllers that need to be synced queue workqueue.RateLimitingInterface } // NewReplicaSetController configures a replica set controller with the specified event recorder -func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } @@ -139,7 +135,6 @@ func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, rsc.podListerSynced = podInformer.Informer().HasSynced rsc.syncHandler = rsc.syncReplicaSet - rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rsc } @@ -172,46 +167,19 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Shutting down ReplicaSet Controller") } -// getPodReplicaSet returns the replica set managing the given pod. -// TODO: Surface that we are ignoring multiple replica sets for a single pod. -// TODO: use ownerReference.Controller to determine if the rs controls the pod. -func (rsc *ReplicaSetController) getPodReplicaSet(pod *v1.Pod) *extensions.ReplicaSet { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached { - rs, ok := obj.(*extensions.ReplicaSet) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicaSet object")) - return nil - } - if cached && rsc.isCacheValid(pod, rs) { - return rs - } - } - - // if not cached or cached value is invalid, search all the rs to find the matching one, and update cache +// getPodReplicaSets returns a list of ReplicaSets matching the given pod. +func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.ReplicaSet { rss, err := rsc.rsLister.GetPodReplicaSets(pod) if err != nil { glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) return nil } - // In theory, overlapping ReplicaSets is user error. This sorting will not prevent - // oscillation of replicas in all cases, eg: - // rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2 - // pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1. - // pod: [(k2:v2)] will wake rs2 which creates a new replica. if len(rss) > 1 { - // More than two items in this list indicates user error. If two replicasets - // overlap, sort by creation timestamp, subsort by name, then pick - // the first. + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss)) } - - // update lookup cache - rsc.lookupCache.Update(pod, rss[0]) - - return rss[0] + return rss } // callback when RS is updated @@ -219,21 +187,6 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { oldRS := old.(*extensions.ReplicaSet) curRS := cur.(*extensions.ReplicaSet) - // We should invalidate the whole lookup cache if a RS's selector has been updated. - // - // Imagine that you have two RSs: - // * old RS1 - // * new RS2 - // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). - // Now imagine that you are changing RS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by RS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { - rsc.lookupCache.InvalidateAll() - } - // You might imagine that we only really need to enqueue the // replica set when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -252,57 +205,44 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { rsc.enqueueReplicaSet(cur) } -// isCacheValid check if the cache is valid -func (rsc *ReplicaSetController) isCacheValid(pod *v1.Pod, cachedRS *extensions.ReplicaSet) bool { - _, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) - // rs has been deleted or updated, cache is invalid - if err != nil || !isReplicaSetMatch(pod, cachedRS) { - return false - } - return true -} - -// isReplicaSetMatch take a Pod and ReplicaSet, return whether the Pod and ReplicaSet are matching -// TODO(mqliang): This logic is a copy from GetPodReplicaSets(), remove the duplication -func isReplicaSetMatch(pod *v1.Pod, rs *extensions.ReplicaSet) bool { - if rs.Namespace != pod.Namespace { - return false - } - selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return false - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true -} - -// When a pod is created, enqueue the replica set that manages it and update it's expectations. +// When a pod is created, enqueue the replica set that manages it and update its expectations. func (rsc *ReplicaSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) - rs := rsc.getPodReplicaSet(pod) - if rs == nil { - return - } - rsKey, err := controller.KeyFunc(rs) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replica set %#v: %v", rs, err)) - return - } if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. rsc.deletePod(pod) return } - rsc.expectations.CreationObserved(rsKey) - rsc.enqueueReplicaSet(rs) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rs) + if err != nil { + return + } + rsc.expectations.CreationObserved(rsKey) + rsc.enqueueReplicaSet(rs) + return + } + + // Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, rs := range rsc.getPodReplicaSets(pod) { + rsc.enqueueReplicaSet(rs) + } } // When a pod is updated, figure out what replica set/s manage it and wake them @@ -317,6 +257,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, @@ -332,18 +273,29 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } - // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. - if labelChanged { - // If the old and new ReplicaSet are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldRS := rsc.getPodReplicaSet(oldPod); oldRS != nil { - rsc.enqueueReplicaSet(oldRS) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + rs, err := rsc.rsLister.ReplicaSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + rsc.enqueueReplicaSet(rs) } } - changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { - rsc.enqueueReplicaSet(curRS) + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rs, err := rsc.rsLister.ReplicaSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + rsc.enqueueReplicaSet(rs) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus // having its status updated with the newly available replica. For now, we can fake the @@ -351,9 +303,18 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // a Pod transitioned to Ready. // Note that this still suffers from #29229, we are just moving the problem one level // "closer" to kubelet (from the deployment to the replica set controller). - if changedToReady && curRS.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", curRS.Name, curRS.Spec.MinReadySeconds) - rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second) + if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 { + glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds) + rsc.enqueueReplicaSetAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, rs := range rsc.getPodReplicaSets(curPod) { + rsc.enqueueReplicaSet(rs) } } } @@ -370,41 +331,46 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) - if rs := rsc.getPodReplicaSet(pod); rs != nil { - rsKey, err := controller.KeyFunc(rs) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) - return - } - rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) - rsc.enqueueReplicaSet(rs) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rs) + if err != nil { + return + } + rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) + rsc.enqueueReplicaSet(rs) } // obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item. func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping replica sets better. Either disallow them at admission time or - // deterministically avoid syncing replica sets that fight over pods. Currently, we only - // ensure that the same replica set is synced for a given pod. When we periodically relist - // all replica sets there will still be some replica instability. One way to handle this is - // by querying the store for all replica sets that this replica set overlaps, as well as all - // replica sets that overlap this ReplicaSet, and sorting them. rsc.queue.Add(key) } @@ -412,16 +378,9 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping replica sets better. Either disallow them at admission time or - // deterministically avoid syncing replica sets that fight over pods. Currently, we only - // ensure that the same replica set is synced for a given pod. When we periodically relist - // all replica sets there will still be some replica instability. One way to handle this is - // by querying the store for all replica sets that this replica set overlaps, as well as all - // replica sets that overlap this ReplicaSet, and sorting them. rsc.queue.AddAfter(key, after) } @@ -483,8 +442,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte var err error boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRSKind().GroupVersion().String(), - Kind: getRSKind().Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), @@ -592,7 +551,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if err != nil { return err } - cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind()) + cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { // Something went wrong with adoption or release. diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index a7fbb5cb51a..80e3e672288 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -51,7 +51,7 @@ import ( "k8s.io/kubernetes/pkg/securitycontext" ) -func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int, lookupCacheSize int) (*ReplicaSetController, informers.SharedInformerFactory) { +func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) { informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) ret := NewReplicaSetController( @@ -59,7 +59,6 @@ func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh ch informers.Core().V1().Pods(), client, burstReplicas, - lookupCacheSize, ) ret.podListerSynced = alwaysReady @@ -216,7 +215,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // 2 running pods, a controller with 2 replicas, sync is a no-op labelMap := map[string]string{"foo": "bar"} @@ -234,7 +233,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -252,7 +251,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl received := make(chan string) @@ -286,7 +285,7 @@ func TestSyncReplicaSetCreates(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // A controller with 2 replicas and no pods in the store, 2 creates expected labelMap := map[string]string{"foo": "bar"} @@ -311,7 +310,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Steady state for the ReplicaSet, no Status.Replicas updates expected activePods := 5 @@ -356,7 +355,7 @@ func TestControllerUpdateReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -405,7 +404,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl @@ -461,7 +460,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { func TestPodControllerLookup(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas) testCases := []struct { inRSs []*extensions.ReplicaSet pod *v1.Pod @@ -509,7 +508,12 @@ func TestPodControllerLookup(t *testing.T) { for _, r := range c.inRSs { informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(r) } - if rs := manager.getPodReplicaSet(c.pod); rs != nil { + if rss := manager.getPodReplicaSets(c.pod); rss != nil { + if len(rss) != 1 { + t.Errorf("len(rss) = %v, want %v", len(rss), 1) + continue + } + rs := rss[0] if c.outRSName != rs.Name { t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName) } @@ -536,7 +540,6 @@ func TestWatchControllers(t *testing.T) { informers.Core().V1().Pods(), client, BurstReplicas, - 0, ) informers.Start(stopCh) @@ -581,7 +584,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Put one ReplicaSet into the shared informer labelMap := map[string]string{"foo": "bar"} @@ -627,7 +630,7 @@ func TestWatchPods(t *testing.T) { func TestUpdatePods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas) received := make(chan string) @@ -656,16 +659,19 @@ func TestUpdatePods(t *testing.T) { testRSSpec2.Name = "barfoo" informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(&testRSSpec2) - // case 1: We put in the podLister a pod with labels matching testRSSpec1, - // then update its labels to match testRSSpec2. We expect to receive a sync - // request for both replica sets. + isController := true + controllerRef1 := metav1.OwnerReference{UID: testRSSpec1.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec1.Name, Controller: &isController} + controllerRef2 := metav1.OwnerReference{UID: testRSSpec2.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec2.Name, Controller: &isController} + + // case 1: Pod with a ControllerRef pod1 := newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} pod1.ResourceVersion = "1" pod2 := pod1 pod2.Labels = labelMap2 pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + expected := sets.NewString(testRSSpec1.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -674,17 +680,20 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for replica sets within 100ms each") + t.Errorf("Expected update notifications for replica sets") } } - // case 2: pod1 in the podLister has labels matching testRSSpec1. We update - // its labels to match no replica set. We expect to receive a sync request - // for testRSSpec1. - pod2.Labels = make(map[string]string) + // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.OwnerReferences = nil pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testRSSpec1.Name) + expected = sets.NewString(testRSSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -693,7 +702,52 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for replica sets within 100ms each") + t.Errorf("Expected update notifications for replica sets") + } + } + + // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and + // any label-matching RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} + pod2 = pod1 + pod2.OwnerReferences = nil + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets") + } + } + + // case 4: Keep ControllerRef, change labels. Expect to sync owning RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap1 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.Labels = labelMap2 + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets") } } } @@ -711,7 +765,7 @@ func TestControllerUpdateRequeue(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) @@ -782,7 +836,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas) manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} @@ -845,6 +899,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // the rs is waiting for. expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t)) podsToDelete := []*v1.Pod{} + isController := true for _, key := range expectedDels.List() { nsName := strings.Split(key, "/") podsToDelete = append(podsToDelete, &v1.Pod{ @@ -852,6 +907,9 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Name: nsName[1], Namespace: nsName[0], Labels: rsSpec.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, + }, }, }) } @@ -888,11 +946,15 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) t.Fatalf("Waiting on unexpected number of deletes.") } nsName := strings.Split(expectedDel.List()[0], "/") + isController := true lastPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: nsName[1], Namespace: nsName[0], Labels: rsSpec.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, + }, }, } informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod) @@ -935,7 +997,7 @@ func TestRSSyncExpectations(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2) manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} @@ -961,7 +1023,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) rs := newReplicaSet(1, map[string]string{"foo": "bar"}) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) @@ -1015,34 +1077,42 @@ func TestOverlappingRSs(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) labelMap := map[string]string{"foo": "bar"} - for i := 0; i < 5; i++ { - func() { - stopCh := make(chan struct{}) - defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) - // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store - var controllers []*extensions.ReplicaSet - for j := 1; j < 10; j++ { - rsSpec := newReplicaSet(1, labelMap) - rsSpec.CreationTimestamp = metav1.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) - rsSpec.Name = string(uuid.NewUUID()) - controllers = append(controllers, rsSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j]) - } - // Add a pod and make sure only the oldest ReplicaSet is synced - pods := newPodList(nil, 1, v1.PodPending, labelMap, controllers[0], "pod") - rsKey := getKey(controllers[0], t) + // Create 10 ReplicaSets, shuffled them randomly and insert them into the + // ReplicaSet controller's store. + // All use the same CreationTimestamp since ControllerRef should be able + // to handle that. + timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) + var controllers []*extensions.ReplicaSet + for j := 1; j < 10; j++ { + rsSpec := newReplicaSet(1, labelMap) + rsSpec.CreationTimestamp = timestamp + rsSpec.Name = fmt.Sprintf("rs%d", j) + controllers = append(controllers, rsSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j]) + } + // Add a pod with a ControllerRef and make sure only the corresponding + // ReplicaSet is synced. Pick a RS in the middle since the old code used to + // sort by name if all timestamps were equal. + rs := controllers[3] + pods := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod") + pod := &pods.Items[0] + isController := true + pod.OwnerReferences = []metav1.OwnerReference{ + {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, + } + rsKey := getKey(rs, t) - manager.addPod(&pods.Items[0]) - queueRS, _ := manager.queue.Get() - if queueRS != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) - } - }() + manager.addPod(pod) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } } @@ -1051,7 +1121,7 @@ func TestDeletionTimestamp(t *testing.T) { labelMap := map[string]string{"foo": "bar"} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10, 0) + manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10) rs := newReplicaSet(1, labelMap) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) @@ -1098,11 +1168,15 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. + isController := true secondPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: "secondPod", Labels: pod.Labels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, + }, }, } manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) @@ -1142,7 +1216,7 @@ func TestDeletionTimestamp(t *testing.T) { func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) { c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} - manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0) + manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas) manager.podControl = fakePodControl return manager, fakePodControl, informers @@ -1372,7 +1446,7 @@ func TestReadyReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} @@ -1414,7 +1488,7 @@ func TestAvailableReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index f0e4a0c8e02..9a920a39e6c 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -29,7 +29,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/util/trace", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index fba38f0df82..5c23a0b0974 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" utiltrace "k8s.io/apiserver/pkg/util/trace" @@ -56,9 +55,8 @@ const ( statusUpdateRetries = 1 ) -func getRCKind() schema.GroupVersionKind { - return v1.SchemeGroupVersion.WithKind("ReplicationController") -} +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = v1.SchemeGroupVersion.WithKind("ReplicationController") // ReplicationManager is responsible for synchronizing ReplicationController objects stored // in the system with actual running pods. @@ -85,14 +83,12 @@ type ReplicationManager struct { // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // Controllers that need to be synced queue workqueue.RateLimitingInterface } // NewReplicationManager configures a replication manager with the specified event recorder -func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicationManager { +func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } @@ -135,7 +131,6 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor rm.podListerSynced = podInformer.Informer().HasSynced rm.syncHandler = rm.syncReplicationController - rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm } @@ -167,71 +162,19 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Shutting down RC Manager") } -// getPodController returns the controller managing the given pod. -// TODO: Surface that we are ignoring multiple controllers for a single pod. -// TODO: use ownerReference.Controller to determine if the rc controls the pod. -func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationController { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached { - controller, ok := obj.(*v1.ReplicationController) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicationController object")) - return nil - } - if cached && rm.isCacheValid(pod, controller) { - return controller - } - } - - // if not cached or cached value is invalid, search all the rc to find the matching one, and update cache - controllers, err := rm.rcLister.GetPodControllers(pod) +// getPodControllers returns a list of ReplicationControllers matching the given pod. +func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController { + rcs, err := rm.rcLister.GetPodControllers(pod) if err != nil { - glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name) + glog.V(4).Infof("No ReplicationControllers found for pod %v, controller will avoid syncing", pod.Name) return nil } - // In theory, overlapping controllers is user error. This sorting will not prevent - // oscillation of replicas in all cases, eg: - // rc1 (older rc): [(k1=v1)], replicas=1 rc2: [(k2=v2)], replicas=2 - // pod: [(k1:v1), (k2:v2)] will wake both rc1 and rc2, and we will sync rc1. - // pod: [(k2:v2)] will wake rc2 which creates a new replica. - if len(controllers) > 1 { - // More than two items in this list indicates user error. If two replication-controller - // overlap, sort by creation timestamp, subsort by name, then pick - // the first. - utilruntime.HandleError(fmt.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(OverlappingControllers(controllers)) + if len(rcs) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. + utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicationController is selecting pods with labels: %+v", pod.Labels)) } - - // update lookup cache - rm.lookupCache.Update(pod, controllers[0]) - - return controllers[0] -} - -// isCacheValid check if the cache is valid -func (rm *ReplicationManager) isCacheValid(pod *v1.Pod, cachedRC *v1.ReplicationController) bool { - _, err := rm.rcLister.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) - // rc has been deleted or updated, cache is invalid - if err != nil || !isControllerMatch(pod, cachedRC) { - return false - } - return true -} - -// isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching -// TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication -func isControllerMatch(pod *v1.Pod, rc *v1.ReplicationController) bool { - if rc.Namespace != pod.Namespace { - return false - } - selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated() - - // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true + return rcs } // callback when RC is updated @@ -239,20 +182,6 @@ func (rm *ReplicationManager) updateRC(old, cur interface{}) { oldRC := old.(*v1.ReplicationController) curRC := cur.(*v1.ReplicationController) - // We should invalidate the whole lookup cache if a RC's selector has been updated. - // - // Imagine that you have two RCs: - // * old RC1 - // * new RC2 - // You also have a pod that is attached to RC2 (because it doesn't match RC1 selector). - // Now imagine that you are changing RC1 selector so that it is now matching that pod, - // in such case, we must invalidate the whole cache so that pod could be adopted by RC1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) { - rm.lookupCache.InvalidateAll() - } // TODO: Remove when #31981 is resolved! glog.Infof("Observed updated replication controller %v. Desired pod count change: %d->%d", curRC.Name, *(oldRC.Spec.Replicas), *(curRC.Spec.Replicas)) @@ -275,19 +204,10 @@ func (rm *ReplicationManager) updateRC(old, cur interface{}) { rm.enqueueController(cur) } -// When a pod is created, enqueue the controller that manages it and update it's expectations. +// When a pod is created, enqueue the ReplicationController that manages it and update its expectations. func (rm *ReplicationManager) addPod(obj interface{}) { pod := obj.(*v1.Pod) - - rc := rm.getPodController(pod) - if rc == nil { - return - } - rcKey, err := controller.KeyFunc(rc) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) - return - } + glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -295,13 +215,38 @@ func (rm *ReplicationManager) addPod(obj interface{}) { rm.deletePod(pod) return } - rm.expectations.CreationObserved(rcKey) - rm.enqueueController(rc) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rc) + if err != nil { + return + } + rm.expectations.CreationObserved(rsKey) + rm.enqueueController(rc) + return + } + + // Otherwise, it's an orphan. Get a list of all matching ReplicationControllers and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, rc := range rm.getPodControllers(pod) { + rm.enqueueController(rc) + } } -// When a pod is updated, figure out what controller/s manage it and wake them +// When a pod is updated, figure out what ReplicationController/s manage it and wake them // up. If the labels of the pod have changed we need to awaken both the old -// and new controller. old and cur must be *v1.Pod types. +// and new ReplicationController. old and cur must be *v1.Pod types. func (rm *ReplicationManager) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) @@ -311,6 +256,7 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { return } glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, @@ -326,34 +272,53 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { return } - // Only need to get the old controller if the labels changed. - // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. - if labelChanged { - // If the old and new rc are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldRC := rm.getPodController(oldPod); oldRC != nil { - rm.enqueueController(oldRC) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + rc, err := rm.rcLister.ReplicationControllers(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + rm.enqueueController(rc) } } - changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curRC := rm.getPodController(curPod); curRC != nil { - rm.enqueueController(curRC) + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rc, err := rm.rcLister.ReplicationControllers(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + rm.enqueueController(rc) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in - // the Pod status which in turn will trigger a requeue of the owning replication controller - // thus having its status updated with the newly available replica. For now, we can fake the - // update by resyncing the controller MinReadySeconds after the it is requeued because a Pod - // transitioned to Ready. + // the Pod status which in turn will trigger a requeue of the owning ReplicationController thus + // having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because + // a Pod transitioned to Ready. // Note that this still suffers from #29229, we are just moving the problem one level - // "closer" to kubelet (from the deployment to the replication controller manager). - if changedToReady && curRC.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", curRC.Name, curRC.Spec.MinReadySeconds) - rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second) + // "closer" to kubelet (from the deployment to the ReplicationController controller). + if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rc.Spec.MinReadySeconds > 0 { + glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", rc.Name, rc.Spec.MinReadySeconds) + rm.enqueueControllerAfter(rc, time.Duration(rc.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, rc := range rm.getPodControllers(curPod) { + rm.enqueueController(rc) } } } -// When a pod is deleted, enqueue the controller that manages the pod and update its expectations. +// When a pod is deleted, enqueue the ReplicationController that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (rm *ReplicationManager) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) @@ -361,45 +326,50 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new rc will not be woken up till the periodic resync. + // changed labels the new ReplicationController will not be woken up till the periodic resync. if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } - glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels) - if rc := rm.getPodController(pod); rc != nil { - rcKey, err := controller.KeyFunc(rc) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) - return - } - rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod)) - rm.enqueueController(rc) + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rc) + if err != nil { + return + } + rm.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) + rm.enqueueController(rc) } // obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. func (rm *ReplicationManager) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping controllers better. Either disallow them at admission time or - // deterministically avoid syncing controllers that fight over pods. Currently, we only - // ensure that the same controller is synced for a given pod. When we periodically relist - // all controllers there will still be some replica instability. One way to handle this is - // by querying the store for all controllers that this rc overlaps, as well as all - // controllers that overlap this rc, and sorting them. rm.queue.Add(key) } @@ -407,16 +377,9 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping controllers better. Either disallow them at admission time or - // deterministically avoid syncing controllers that fight over pods. Currently, we only - // ensure that the same controller is synced for a given pod. When we periodically relist - // all controllers there will still be some replica instability. One way to handle this is - // by querying the store for all controllers that this rc overlaps, as well as all - // controllers that overlap this rc, and sorting them. rm.queue.AddAfter(key, after) } @@ -481,8 +444,8 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl var err error boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRCKind().GroupVersion().String(), - Kind: getRCKind().Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: rc.Name, UID: rc.UID, BlockOwnerDeletion: boolPtr(true), @@ -610,7 +573,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rm.queue.Add(key) return err } - cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) + cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { // Something went wrong with adoption or release. diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 27fac3aa5c1..dda9435f9db 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -167,11 +167,11 @@ type serverResponse struct { obj interface{} } -func NewReplicationManagerFromClient(kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) (*ReplicationManager, coreinformers.PodInformer, coreinformers.ReplicationControllerInformer) { +func newReplicationManagerFromClient(kubeClient clientset.Interface, burstReplicas int) (*ReplicationManager, coreinformers.PodInformer, coreinformers.ReplicationControllerInformer) { informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() rcInformer := informerFactory.Core().V1().ReplicationControllers() - rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize) + rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas) rm.podListerSynced = alwaysReady rm.rcListerSynced = alwaysReady return rm, podInformer, rcInformer @@ -180,7 +180,7 @@ func NewReplicationManagerFromClient(kubeClient clientset.Interface, burstReplic func TestSyncReplicationControllerDoesNothing(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) @@ -195,7 +195,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -210,7 +210,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, _, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl received := make(chan string) @@ -241,7 +241,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // A controller with 2 replicas and no pods in the store, 2 creates expected rc := newReplicationController(2) @@ -262,7 +262,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 @@ -302,7 +302,7 @@ func TestControllerUpdateReplicas(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -347,7 +347,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -399,7 +399,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager, _, rcInformer := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), BurstReplicas) testCases := []struct { inRCs []*v1.ReplicationController pod *v1.Pod @@ -447,7 +447,12 @@ func TestPodControllerLookup(t *testing.T) { for _, r := range c.inRCs { rcInformer.Informer().GetIndexer().Add(r) } - if rc := manager.getPodController(c.pod); rc != nil { + if rcs := manager.getPodControllers(c.pod); rcs != nil { + if len(rcs) != 1 { + t.Errorf("len(rcs) = %v, want %v", len(rcs), 1) + continue + } + rc := rcs[0] if c.outRCName != rc.Name { t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName) } @@ -466,7 +471,7 @@ func TestWatchControllers(t *testing.T) { informers := informers.NewSharedInformerFactory(c, controller.NoResyncPeriodFunc()) podInformer := informers.Core().V1().Pods() rcInformer := informers.Core().V1().ReplicationControllers() - manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas, 0) + manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas) informers.Start(stopCh) var testControllerSpec v1.ReplicationController @@ -506,7 +511,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) @@ -547,7 +552,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager, podInformer, rcInformer := NewReplicationManagerFromClient(fake.NewSimpleClientset(), BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(fake.NewSimpleClientset(), BurstReplicas) received := make(chan string) @@ -565,23 +570,29 @@ func TestUpdatePods(t *testing.T) { go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // Put 2 rcs and one pod into the controller's stores + labelMap1 := map[string]string{"foo": "bar"} testControllerSpec1 := newReplicationController(1) + testControllerSpec1.Spec.Selector = labelMap1 rcInformer.Informer().GetIndexer().Add(testControllerSpec1) + labelMap2 := map[string]string{"bar": "foo"} testControllerSpec2 := *testControllerSpec1 - testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"} + testControllerSpec2.Spec.Selector = labelMap2 testControllerSpec2.Name = "barfoo" rcInformer.Informer().GetIndexer().Add(&testControllerSpec2) - // case 1: We put in the podLister a pod with labels matching - // testControllerSpec1, then update its labels to match testControllerSpec2. - // We expect to receive a sync request for both controllers. + isController := true + controllerRef1 := metav1.OwnerReference{UID: testControllerSpec1.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec1.Name, Controller: &isController} + controllerRef2 := metav1.OwnerReference{UID: testControllerSpec2.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec2.Name, Controller: &isController} + + // case 1: Pod with a ControllerRef pod1 := newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} pod1.ResourceVersion = "1" pod2 := pod1 - pod2.Labels = testControllerSpec2.Spec.Selector + pod2.Labels = labelMap2 pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name) + expected := sets.NewString(testControllerSpec1.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -590,17 +601,20 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for controllers within 100ms each") + t.Errorf("Expected update notifications for ReplicationControllers") } } - // case 2: pod1 in the podLister has labels matching testControllerSpec1. - // We update its labels to match no replication controller. We expect to - // receive a sync request for testControllerSpec1. - pod2.Labels = make(map[string]string) + // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.OwnerReferences = nil pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testControllerSpec1.Name) + expected = sets.NewString(testControllerSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -609,7 +623,52 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for controllers within 100ms each") + t.Errorf("Expected update notifications for ReplicationControllers") + } + } + + // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and + // any label-matching RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} + pod2 = pod1 + pod2.OwnerReferences = nil + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for ReplicationControllers") + } + } + + // case 4: Keep ControllerRef, change labels. Expect to sync owning RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap1 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.Labels = labelMap2 + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testControllerSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for ReplicationControllers") } } } @@ -624,7 +683,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) @@ -694,7 +753,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, burstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, burstReplicas) manager.podControl = &fakePodControl controllerSpec := newReplicationController(numReplicas) @@ -755,6 +814,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // the rc is waiting for. expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t)) podsToDelete := []*v1.Pod{} + isController := true for _, key := range expectedDels.List() { nsName := strings.Split(key, "/") podsToDelete = append(podsToDelete, &v1.Pod{ @@ -762,6 +822,9 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Name: nsName[1], Namespace: nsName[0], Labels: controllerSpec.Spec.Selector, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, }) } @@ -798,11 +861,15 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) t.Fatalf("Waiting on unexpected number of deletes.") } nsName := strings.Split(expectedDel.List()[0], "/") + isController := true lastPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: nsName[1], Namespace: nsName[0], Labels: controllerSpec.Spec.Selector, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, } podInformer.Informer().GetIndexer().Delete(lastPod) @@ -843,7 +910,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, 2, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 2) manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -866,7 +933,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 10) rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) @@ -919,36 +986,46 @@ func shuffle(controllers []*v1.ReplicationController) []*v1.ReplicationControlle func TestOverlappingRCs(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - for i := 0; i < 5; i++ { - manager, _, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, 10) - // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store - var controllers []*v1.ReplicationController - for j := 1; j < 10; j++ { - controllerSpec := newReplicationController(1) - controllerSpec.CreationTimestamp = metav1.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) - controllerSpec.Name = string(uuid.NewUUID()) - controllers = append(controllers, controllerSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - rcInformer.Informer().GetIndexer().Add(shuffledControllers[j]) - } - // Add a pod and make sure only the oldest rc is synced - pods := newPodList(nil, 1, v1.PodPending, controllers[0], "pod") - rcKey := getKey(controllers[0], t) + // Create 10 rcs, shuffled them randomly and insert them into the + // rc manager's store. + // All use the same CreationTimestamp since ControllerRef should be able + // to handle that. + var controllers []*v1.ReplicationController + timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) + for j := 1; j < 10; j++ { + controllerSpec := newReplicationController(1) + controllerSpec.CreationTimestamp = timestamp + controllerSpec.Name = fmt.Sprintf("rc%d", j) + controllers = append(controllers, controllerSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + rcInformer.Informer().GetIndexer().Add(shuffledControllers[j]) + } + // Add a pod with a ControllerRef and make sure only the corresponding + // ReplicationController is synced. Pick a RC in the middle since the old code + // used to sort by name if all timestamps were equal. + rc := controllers[3] + pods := newPodList(nil, 1, v1.PodPending, rc, "pod") + pod := &pods.Items[0] + isController := true + pod.OwnerReferences = []metav1.OwnerReference{ + {UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &isController}, + } + rcKey := getKey(rc, t) - manager.addPod(&pods.Items[0]) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } + manager.addPod(pod) + queueRC, _ := manager.queue.Get() + if queueRC != rcKey { + t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) } } func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, 10) controllerSpec := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(controllerSpec) @@ -995,11 +1072,15 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. + isController := true secondPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: "secondPod", Labels: pod.Labels, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, } manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)}) @@ -1037,7 +1118,7 @@ func TestDeletionTimestamp(t *testing.T) { func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(client, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) const nsNum = 1000 @@ -1076,14 +1157,14 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { for i := 0; i < b.N; i++ { for _, pod := range pods { - manager.getPodController(&pod) + manager.getPodControllers(&pod) } } } func BenchmarkGetPodControllerSingleNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(client, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) const rcNum = 1000 const replicaNum = 3 @@ -1116,7 +1197,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { for i := 0; i < b.N; i++ { for _, pod := range pods { - manager.getPodController(&pod) + manager.getPodControllers(&pod) } } } @@ -1125,7 +1206,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl, podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer) { c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} - manager, podInformer, rcInformer = NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer = newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = fakePodControl return manager, fakePodControl, podInformer, rcInformer } @@ -1327,7 +1408,7 @@ func TestReadyReplicas(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) @@ -1365,7 +1446,7 @@ func TestAvailableReplicas(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 6729fa85b0a..c0ca8cb9fd6 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -124,7 +124,7 @@ func NewMasterComponents(c *Config) *MasterComponents { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}, QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst, 4096) + controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{}) diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index d1f1343d42a..f8ce95b3fe0 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -96,7 +96,6 @@ func TestQuota(t *testing.T) { informers.Core().V1().ReplicationControllers(), clientset, replicationcontroller.BurstReplicas, - 4096, ) rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(3, controllerCh) @@ -280,7 +279,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { informers.Core().V1().ReplicationControllers(), clientset, replicationcontroller.BurstReplicas, - 4096, ) rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(3, controllerCh) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index b9feaff9449..bd0c0aebf04 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -143,7 +143,6 @@ func rmSetup(t *testing.T) (*httptest.Server, *replicaset.ReplicaSetController, informers.Core().V1().Pods(), clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), replicaset.BurstReplicas, - 4096, ) if err != nil { diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 4ee05ce72df..afcfed13da4 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -135,7 +135,7 @@ func rmSetup(t *testing.T, stopCh chan struct{}) (*httptest.Server, *replication resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientSet, resyncPeriod) - rm := replication.NewReplicationManager(informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientSet, replication.BurstReplicas, 4096) + rm := replication.NewReplicationManager(informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientSet, replication.BurstReplicas) informers.Start(stopCh) return s, rm, informers, clientSet