From 70bd5fdfe55a999776d78cf65e38f11ab5637b57 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 16 Feb 2017 08:53:34 -0800 Subject: [PATCH] Refactor ControllerRefManager To prepare for implementing ControllerRef across all controllers, this pushes the common adopt/orphan logic into ControllerRefManager so each controller doesn't have to duplicate it. This also shares the adopt/orphan logic between Pods and ReplicaSets, so it lives in only one place. --- pkg/controller/BUILD | 1 + pkg/controller/controller_ref_manager.go | 318 +++++++++++------- .../deployment/deployment_controller.go | 35 +- pkg/controller/replicaset/BUILD | 1 - pkg/controller/replicaset/replica_set.go | 40 +-- pkg/controller/replicaset/replica_set_test.go | 16 +- pkg/controller/replication/BUILD | 1 - .../replication/replication_controller.go | 39 +-- .../replication_controller_test.go | 16 +- 9 files changed, 243 insertions(+), 224 deletions(-) diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index 15f54086d26..53530818e36 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -38,6 +38,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/strategicpatch", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 7523a4586c0..7fcc78bf508 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -25,78 +25,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" ) -type PodControllerRefManager struct { - podControl PodControlInterface - controllerObject metav1.ObjectMeta - controllerSelector labels.Selector - controllerKind schema.GroupVersionKind -} - -// NewPodControllerRefManager returns a PodControllerRefManager that exposes -// methods to manage the controllerRef of pods. -func NewPodControllerRefManager( - podControl PodControlInterface, - controllerObject metav1.ObjectMeta, - controllerSelector labels.Selector, - controllerKind schema.GroupVersionKind, -) *PodControllerRefManager { - return &PodControllerRefManager{podControl, controllerObject, controllerSelector, controllerKind} -} - -// Classify first filters out inactive pods, then it classify the remaining pods -// into three categories: 1. matchesAndControlled are the pods whose labels -// match the selector of the RC, and have a controllerRef pointing to the -// controller 2. matchesNeedsController are the pods whose labels match the RC, -// but don't have a controllerRef. (Pods with matching labels but with a -// controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch -// are the pods that have a controllerRef pointing to the controller, but their -// labels no longer match the selector. -func (m *PodControllerRefManager) Classify(pods []*v1.Pod) ( - matchesAndControlled []*v1.Pod, - matchesNeedsController []*v1.Pod, - controlledDoesNotMatch []*v1.Pod) { - for i := range pods { - pod := pods[i] - if !IsPodActive(pod) { - glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", - pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) - continue - } - controllerRef := GetControllerOf(&pod.ObjectMeta) - if controllerRef != nil { - if controllerRef.UID == m.controllerObject.UID { - // already controlled - if m.controllerSelector.Matches(labels.Set(pod.Labels)) { - matchesAndControlled = append(matchesAndControlled, pod) - } else { - controlledDoesNotMatch = append(controlledDoesNotMatch, pod) - } - } else { - // ignoring the pod controlled by other controller - glog.V(4).Infof("Ignoring pod %v/%v, it's owned by [%s/%s, name: %s, uid: %s]", - pod.Namespace, pod.Name, controllerRef.APIVersion, controllerRef.Kind, controllerRef.Name, controllerRef.UID) - continue - } - } else { - if !m.controllerSelector.Matches(labels.Set(pod.Labels)) { - continue - } - matchesNeedsController = append(matchesNeedsController, pod) - } - } - return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch -} - // GetControllerOf returns the controllerRef if controllee has a controller, // otherwise returns nil. -func GetControllerOf(controllee *metav1.ObjectMeta) *metav1.OwnerReference { - for i := range controllee.OwnerReferences { - owner := &controllee.OwnerReferences[i] - // controlled by other controller +func GetControllerOf(controllee metav1.Object) *metav1.OwnerReference { + ownerRefs := controllee.GetOwnerReferences() + for i := range ownerRefs { + owner := &ownerRefs[i] if owner.Controller != nil && *owner.Controller == true { return owner } @@ -104,18 +43,157 @@ func GetControllerOf(controllee *metav1.ObjectMeta) *metav1.OwnerReference { return nil } +type baseControllerRefManager struct { + controller metav1.Object + selector labels.Selector +} + +// claimObject tries to take ownership of an object for this controller. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The returned boolean indicates whether you now +// own the object. +// +// No reconciliation will be attempted if the controller is being deleted. +func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release func(metav1.Object) error) (bool, error) { + controllerRef := GetControllerOf(obj) + if controllerRef != nil { + if controllerRef.UID != m.controller.GetUID() { + // Owned by someone else. Ignore. + return false, nil + } + if m.selector.Matches(labels.Set(obj.GetLabels())) { + // We already own it and the selector matches. + // Return true (successfully claimed) before checking deletion timestamp. + // We're still allowed to claim things we already own while being deleted + // because doing so requires taking no actions. + return true, nil + } + // Owned by us but selector doesn't match. + // Try to release, unless we're being deleted. + if m.controller.GetDeletionTimestamp() != nil { + return false, nil + } + if err := release(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else released it, or there was a transient error. + // The controller should requeue and try again if it's still stale. + return false, err + } + // Successfully released. + return false, nil + } + + // It's an orphan. + if m.controller.GetDeletionTimestamp() != nil || + !m.selector.Matches(labels.Set(obj.GetLabels())) { + // Ignore if we're being deleted or selector doesn't match. + return false, nil + } + // Selector matches. Try to adopt. + if err := adopt(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else claimed it first, or there was a transient error. + // The controller should requeue and try again if it's still orphaned. + return false, err + } + // Successfully adopted. + return true, nil +} + +type PodControllerRefManager struct { + baseControllerRefManager + controllerKind schema.GroupVersionKind + podControl PodControlInterface +} + +// NewPodControllerRefManager returns a PodControllerRefManager that exposes +// methods to manage the controllerRef of pods. +func NewPodControllerRefManager( + podControl PodControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, +) *PodControllerRefManager { + return &PodControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + }, + controllerKind: controllerKind, + podControl: podControl, + } +} + +// ClaimPods tries to take ownership of a list of Pods. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of Pods that you now own is returned. +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { + var claimed []*v1.Pod + var errlist []error + + adopt := func(obj metav1.Object) error { + return m.AdoptPod(obj.(*v1.Pod)) + } + release := func(obj metav1.Object) error { + return m.ReleasePod(obj.(*v1.Pod)) + } + + for _, pod := range pods { + if !IsPodActive(pod) { + glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) + continue + } + ok, err := m.claimObject(pod, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, pod) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + // AdoptPod sends a patch to take control of the pod. It returns the error if // the patching fails. func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { // we should not adopt any pods if the controller is about to be deleted - if m.controllerObject.DeletionTimestamp != nil { + if m.controller.GetDeletionTimestamp() != nil { return fmt.Errorf("cancel the adopt attempt for pod %s because the controller is being deleted", strings.Join([]string{pod.Namespace, pod.Name, string(pod.UID)}, "_")) } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. addControllerPatch := fmt.Sprintf( `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`, m.controllerKind.GroupVersion(), m.controllerKind.Kind, - m.controllerObject.Name, m.controllerObject.UID, pod.UID) + m.controller.GetName(), m.controller.GetUID(), pod.UID) return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) } @@ -123,8 +201,8 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { // It returns the error if the patching fails. 404 and 422 errors are ignored. func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", - pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerObject.Name) - deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controllerObject.UID, pod.UID) + pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), pod.UID) err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch)) if err != nil { if errors.IsNotFound(err) { @@ -152,74 +230,80 @@ func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { // categories and accordingly adopt or release them. See comments on these functions // for more details. type ReplicaSetControllerRefManager struct { - rsControl RSControlInterface - controllerObject metav1.ObjectMeta - controllerSelector labels.Selector - controllerKind schema.GroupVersionKind + baseControllerRefManager + controllerKind schema.GroupVersionKind + rsControl RSControlInterface } // NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes // methods to manage the controllerRef of ReplicaSets. func NewReplicaSetControllerRefManager( rsControl RSControlInterface, - controllerObject metav1.ObjectMeta, - controllerSelector labels.Selector, + controller metav1.Object, + selector labels.Selector, controllerKind schema.GroupVersionKind, ) *ReplicaSetControllerRefManager { - return &ReplicaSetControllerRefManager{rsControl, controllerObject, controllerSelector, controllerKind} + return &ReplicaSetControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + }, + controllerKind: controllerKind, + rsControl: rsControl, + } } -// Classify, classifies the ReplicaSets into three categories: -// 1. matchesAndControlled are the ReplicaSets whose labels -// match the selector of the Deployment, and have a controllerRef pointing to the -// Deployment. -// 2. matchesNeedsController are ReplicaSets ,whose labels match the Deployment, -// but don't have a controllerRef. (ReplicaSets with matching labels but with a -// controllerRef pointing to other object are ignored) -// 3. controlledDoesNotMatch are the ReplicaSets that have a controllerRef pointing -// to the Deployment, but their labels no longer match the selector. -func (m *ReplicaSetControllerRefManager) Classify(replicaSets []*extensions.ReplicaSet) ( - matchesAndControlled []*extensions.ReplicaSet, - matchesNeedsController []*extensions.ReplicaSet, - controlledDoesNotMatch []*extensions.ReplicaSet) { - for i := range replicaSets { - replicaSet := replicaSets[i] - controllerRef := GetControllerOf(&replicaSet.ObjectMeta) - if controllerRef != nil { - if controllerRef.UID != m.controllerObject.UID { - // ignoring the ReplicaSet controlled by other Deployment - glog.V(4).Infof("Ignoring ReplicaSet %v/%v, it's owned by [%s/%s, name: %s, uid: %s]", - replicaSet.Namespace, replicaSet.Name, controllerRef.APIVersion, controllerRef.Kind, controllerRef.Name, controllerRef.UID) - continue - } - // already controlled by this Deployment - if m.controllerSelector.Matches(labels.Set(replicaSet.Labels)) { - matchesAndControlled = append(matchesAndControlled, replicaSet) - } else { - controlledDoesNotMatch = append(controlledDoesNotMatch, replicaSet) - } - } else { - if !m.controllerSelector.Matches(labels.Set(replicaSet.Labels)) { - continue - } - matchesNeedsController = append(matchesNeedsController, replicaSet) +// ClaimReplicaSets tries to take ownership of a list of ReplicaSets. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of ReplicaSets that you now own is +// returned. +func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.ReplicaSet) ([]*extensions.ReplicaSet, error) { + var claimed []*extensions.ReplicaSet + var errlist []error + + adopt := func(obj metav1.Object) error { + return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet)) + } + release := func(obj metav1.Object) error { + return m.ReleaseReplicaSet(obj.(*extensions.ReplicaSet)) + } + + for _, rs := range sets { + ok, err := m.claimObject(rs, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, rs) } } - return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch + return claimed, utilerrors.NewAggregate(errlist) } // AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns the error if // the patching fails. func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.ReplicaSet) error { // we should not adopt any ReplicaSets if the Deployment is about to be deleted - if m.controllerObject.DeletionTimestamp != nil { + if m.controller.GetDeletionTimestamp() != nil { return fmt.Errorf("cancel the adopt attempt for RS %s because the controller %v is being deleted", - strings.Join([]string{replicaSet.Namespace, replicaSet.Name, string(replicaSet.UID)}, "_"), m.controllerObject.Name) + strings.Join([]string{replicaSet.Namespace, replicaSet.Name, string(replicaSet.UID)}, "_"), m.controller.GetName()) } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. addControllerPatch := fmt.Sprintf( `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`, m.controllerKind.GroupVersion(), m.controllerKind.Kind, - m.controllerObject.Name, m.controllerObject.UID, replicaSet.UID) + m.controller.GetName(), m.controller.GetUID(), replicaSet.UID) return m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(addControllerPatch)) } @@ -227,8 +311,8 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions. // It returns the error if the patching fails. 404 and 422 errors are ignored. func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extensions.ReplicaSet) error { glog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", - replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerObject.Name) - deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controllerObject.UID, replicaSet.UID) + replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), replicaSet.UID) err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch)) if err != nil { if errors.IsNotFound(err) { diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 7c65095d0d8..94bfc02e3e4 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -443,11 +443,11 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) { dc.queue.Forget(key) } -// classifyReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets +// claimReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets // and adopts them if their labels match the Deployment but are missing the reference. // It also removes the controllerRef for ReplicaSets, whose labels no longer matches // the deployment. -func (dc *DeploymentController) classifyReplicaSets(deployment *extensions.Deployment) error { +func (dc *DeploymentController) claimReplicaSets(deployment *extensions.Deployment) error { rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything()) if err != nil { return err @@ -457,32 +457,9 @@ func (dc *DeploymentController) classifyReplicaSets(deployment *extensions.Deplo if err != nil { return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err) } - cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment.ObjectMeta, deploymentSelector, getDeploymentKind()) - matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(rsList) - // Adopt replica sets only if this deployment is not going to be deleted. - if deployment.DeletionTimestamp == nil { - for _, replicaSet := range matchesNeedsController { - err := cm.AdoptReplicaSet(replicaSet) - // continue to next RS if adoption fails. - if err != nil { - // If the RS no longer exists, don't even log the error. - if !errors.IsNotFound(err) { - utilruntime.HandleError(err) - } - } else { - matchesAndControlled = append(matchesAndControlled, replicaSet) - } - } - } - // remove the controllerRef for the RS that no longer have matching labels - var errlist []error - for _, replicaSet := range controlledDoesNotMatch { - err := cm.ReleaseReplicaSet(replicaSet) - if err != nil { - errlist = append(errlist, err) - } - } - return utilerrors.NewAggregate(errlist) + cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment, deploymentSelector, getDeploymentKind()) + _, err = cm.ClaimReplicaSets(rsList) + return err } // syncDeployment will sync the deployment with the given key. @@ -565,7 +542,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { dc.cleanupDeployment(oldRSs, d) } - err = dc.classifyReplicaSets(d) + err = dc.claimReplicaSets(d) if err != nil { return err } diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index 70deeceae0d..0a93111b2e5 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -33,7 +33,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", - "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 159045def4c..02073a2055c 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -603,38 +602,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if err != nil { return err } - cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) - matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) - // Adopt pods only if this replica set is not going to be deleted. - if rs.DeletionTimestamp == nil { - for _, pod := range matchesNeedsController { - err := cm.AdoptPod(pod) - // continue to next pod if adoption fails. - if err != nil { - // If the pod no longer exists, don't even log the error. - if !errors.IsNotFound(err) { - utilruntime.HandleError(err) - } - } else { - matchesAndControlled = append(matchesAndControlled, pod) - } - } - } - filteredPods = matchesAndControlled - // remove the controllerRef for the pods that no longer have matching labels - var errlist []error - for _, pod := range controlledDoesNotMatch { - err := cm.ReleasePod(pod) - if err != nil { - errlist = append(errlist, err) - } - } - if len(errlist) != 0 { - aggregate := utilerrors.NewAggregate(errlist) - // push the RS into work queue again. We need to try to free the - // pods again otherwise they will stuck with the stale - // controllerRef. - return aggregate + cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind()) + filteredPods, err = cm.ClaimPods(pods) + if err != nil { + // Something went wrong with adoption or release. + // Requeue and try again so we don't leave orphans sitting around. + rsc.queue.Add(key) + return err } } else { pods, err := rsc.podLister.Pods(rs.Namespace).List(selector) diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 654a9fe0ad7..87249c0345e 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -1224,14 +1224,20 @@ func TestPatchPodFails(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil)) informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil)) // let both patches fail. The rs controller will assume it fails to take - // control of the pods and create new ones. + // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") - err := manager.syncReplicaSet(getKey(rs, t)) - if err == nil || err.Error() != "Fake Error" { + rsKey := getKey(rs, t) + err := manager.syncReplicaSet(rsKey) + if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Errorf("expected Fake Error, got %+v", err) } - // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. - validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) + // 2 patches to take control of pod1 and pod2 (both fail). + validateSyncReplicaSet(t, fakePodControl, 0, 0, 2) + // RS should requeue itself. + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } } func TestPatchExtraPodsThenDelete(t *testing.T) { diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index da0393f2d50..f0e4a0c8e02 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -30,7 +30,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", - "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/util/trace", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index f10f57e0a4d..3a85c6143f9 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" utiltrace "k8s.io/apiserver/pkg/util/trace" @@ -620,39 +619,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rm.queue.Add(key) return err } - cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) - matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) - // Adopt pods only if this replication controller is not going to be deleted. - if rc.DeletionTimestamp == nil { - for _, pod := range matchesNeedsController { - err := cm.AdoptPod(pod) - // continue to next pod if adoption fails. - if err != nil { - // If the pod no longer exists, don't even log the error. - if !errors.IsNotFound(err) { - utilruntime.HandleError(err) - } - } else { - matchesAndControlled = append(matchesAndControlled, pod) - } - } - } - filteredPods = matchesAndControlled - // remove the controllerRef for the pods that no longer have matching labels - var errlist []error - for _, pod := range controlledDoesNotMatch { - err := cm.ReleasePod(pod) - if err != nil { - errlist = append(errlist, err) - } - } - if len(errlist) != 0 { - aggregate := utilerrors.NewAggregate(errlist) - // push the RC into work queue again. We need to try to free the - // pods again otherwise they will stuck with the stale - // controllerRef. + cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) + filteredPods, err = cm.ClaimPods(pods) + if err != nil { + // Something went wrong with adoption or release. + // Requeue and try again so we don't leave orphans sitting around. rm.queue.Add(key) - return aggregate + return err } } else { pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 0155637dc98..a3fb7406cb1 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -1190,14 +1190,20 @@ func TestPatchPodFails(t *testing.T) { podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil)) podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil)) // let both patches fail. The rc manager will assume it fails to take - // control of the pods and create new ones. + // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") - err := manager.syncReplicationController(getKey(rc, t)) - if err == nil || err.Error() != "Fake Error" { + rcKey := getKey(rc, t) + err := manager.syncReplicationController(rcKey) + if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Fatalf("expected Fake Error, got %v", err) } - // 2 patches to take control of pod1 and pod2 (both fail), 2 creates. - validateSyncReplication(t, fakePodControl, 2, 0, 2) + // 2 patches to take control of pod1 and pod2 (both fail). + validateSyncReplication(t, fakePodControl, 0, 0, 2) + // RC should requeue itself. + queueRC, _ := manager.queue.Get() + if queueRC != rcKey { + t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) + } } func TestPatchExtraPodsThenDelete(t *testing.T) {