Merge pull request #41361 from enisoc/controller-ref-manager

Automatic merge from submit-queue (batch tested with PRs 41667, 41820, 40910, 41645, 41361)

Refactor ControllerRefManager

**What this PR does / why we need it**:

To prepare for implementing ControllerRef across all controllers (https://github.com/kubernetes/community/pull/298), this pushes the common adopt/orphan logic into ControllerRefManager so each controller doesn't have to duplicate it.

This also shares the adopt/orphan logic between Pods and ReplicaSets, so it lives in only one place.

**Which issue this PR fixes**:

**Special notes for your reviewer**:

**Release note**:
```release-note
```

cc @kubernetes/sig-apps-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-02-23 20:57:32 -08:00 committed by GitHub
commit a8a8120ecd
9 changed files with 243 additions and 224 deletions

View File

@ -40,6 +40,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/strategicpatch", "//vendor:k8s.io/apimachinery/pkg/util/strategicpatch",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -25,78 +25,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
) )
type PodControllerRefManager struct {
podControl PodControlInterface
controllerObject metav1.ObjectMeta
controllerSelector labels.Selector
controllerKind schema.GroupVersionKind
}
// NewPodControllerRefManager returns a PodControllerRefManager that exposes
// methods to manage the controllerRef of pods.
func NewPodControllerRefManager(
podControl PodControlInterface,
controllerObject metav1.ObjectMeta,
controllerSelector labels.Selector,
controllerKind schema.GroupVersionKind,
) *PodControllerRefManager {
return &PodControllerRefManager{podControl, controllerObject, controllerSelector, controllerKind}
}
// Classify first filters out inactive pods, then it classify the remaining pods
// into three categories: 1. matchesAndControlled are the pods whose labels
// match the selector of the RC, and have a controllerRef pointing to the
// controller 2. matchesNeedsController are the pods whose labels match the RC,
// but don't have a controllerRef. (Pods with matching labels but with a
// controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch
// are the pods that have a controllerRef pointing to the controller, but their
// labels no longer match the selector.
func (m *PodControllerRefManager) Classify(pods []*v1.Pod) (
matchesAndControlled []*v1.Pod,
matchesNeedsController []*v1.Pod,
controlledDoesNotMatch []*v1.Pod) {
for i := range pods {
pod := pods[i]
if !IsPodActive(pod) {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp)
continue
}
controllerRef := GetControllerOf(&pod.ObjectMeta)
if controllerRef != nil {
if controllerRef.UID == m.controllerObject.UID {
// already controlled
if m.controllerSelector.Matches(labels.Set(pod.Labels)) {
matchesAndControlled = append(matchesAndControlled, pod)
} else {
controlledDoesNotMatch = append(controlledDoesNotMatch, pod)
}
} else {
// ignoring the pod controlled by other controller
glog.V(4).Infof("Ignoring pod %v/%v, it's owned by [%s/%s, name: %s, uid: %s]",
pod.Namespace, pod.Name, controllerRef.APIVersion, controllerRef.Kind, controllerRef.Name, controllerRef.UID)
continue
}
} else {
if !m.controllerSelector.Matches(labels.Set(pod.Labels)) {
continue
}
matchesNeedsController = append(matchesNeedsController, pod)
}
}
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch
}
// GetControllerOf returns the controllerRef if controllee has a controller, // GetControllerOf returns the controllerRef if controllee has a controller,
// otherwise returns nil. // otherwise returns nil.
func GetControllerOf(controllee *metav1.ObjectMeta) *metav1.OwnerReference { func GetControllerOf(controllee metav1.Object) *metav1.OwnerReference {
for i := range controllee.OwnerReferences { ownerRefs := controllee.GetOwnerReferences()
owner := &controllee.OwnerReferences[i] for i := range ownerRefs {
// controlled by other controller owner := &ownerRefs[i]
if owner.Controller != nil && *owner.Controller == true { if owner.Controller != nil && *owner.Controller == true {
return owner return owner
} }
@ -104,18 +43,157 @@ func GetControllerOf(controllee *metav1.ObjectMeta) *metav1.OwnerReference {
return nil return nil
} }
type baseControllerRefManager struct {
controller metav1.Object
selector labels.Selector
}
// claimObject tries to take ownership of an object for this controller.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attemped and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The returned boolean indicates whether you now
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release func(metav1.Object) error) (bool, error) {
controllerRef := GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
if m.selector.Matches(labels.Set(obj.GetLabels())) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
// because doing so requires taking no actions.
return true, nil
}
// Owned by us but selector doesn't match.
// Try to release, unless we're being deleted.
if m.controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else released it, or there was a transient error.
// The controller should requeue and try again if it's still stale.
return false, err
}
// Successfully released.
return false, nil
}
// It's an orphan.
if m.controller.GetDeletionTimestamp() != nil ||
!m.selector.Matches(labels.Set(obj.GetLabels())) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// Successfully adopted.
return true, nil
}
type PodControllerRefManager struct {
baseControllerRefManager
controllerKind schema.GroupVersionKind
podControl PodControlInterface
}
// NewPodControllerRefManager returns a PodControllerRefManager that exposes
// methods to manage the controllerRef of pods.
func NewPodControllerRefManager(
podControl PodControlInterface,
controller metav1.Object,
selector labels.Selector,
controllerKind schema.GroupVersionKind,
) *PodControllerRefManager {
return &PodControllerRefManager{
baseControllerRefManager: baseControllerRefManager{
controller: controller,
selector: selector,
},
controllerKind: controllerKind,
podControl: podControl,
}
}
// ClaimPods tries to take ownership of a list of Pods.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// A non-nil error is returned if some form of reconciliation was attemped and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Pods that you now own is returned.
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
}
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
}
for _, pod := range pods {
if !IsPodActive(pod) {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp)
continue
}
ok, err := m.claimObject(pod, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
return claimed, utilerrors.NewAggregate(errlist)
}
// AdoptPod sends a patch to take control of the pod. It returns the error if // AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails. // the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
// we should not adopt any pods if the controller is about to be deleted // we should not adopt any pods if the controller is about to be deleted
if m.controllerObject.DeletionTimestamp != nil { if m.controller.GetDeletionTimestamp() != nil {
return fmt.Errorf("cancel the adopt attempt for pod %s because the controller is being deleted", return fmt.Errorf("cancel the adopt attempt for pod %s because the controller is being deleted",
strings.Join([]string{pod.Namespace, pod.Name, string(pod.UID)}, "_")) strings.Join([]string{pod.Namespace, pod.Name, string(pod.UID)}, "_"))
} }
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf( addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`, `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.controllerObject.Name, m.controllerObject.UID, pod.UID) m.controller.GetName(), m.controller.GetUID(), pod.UID)
return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))
} }
@ -123,8 +201,8 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerObject.Name) pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controllerObject.UID, pod.UID) deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), pod.UID)
err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch)) err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch))
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
@ -152,74 +230,80 @@ func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
// categories and accordingly adopt or release them. See comments on these functions // categories and accordingly adopt or release them. See comments on these functions
// for more details. // for more details.
type ReplicaSetControllerRefManager struct { type ReplicaSetControllerRefManager struct {
rsControl RSControlInterface baseControllerRefManager
controllerObject metav1.ObjectMeta controllerKind schema.GroupVersionKind
controllerSelector labels.Selector rsControl RSControlInterface
controllerKind schema.GroupVersionKind
} }
// NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes // NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes
// methods to manage the controllerRef of ReplicaSets. // methods to manage the controllerRef of ReplicaSets.
func NewReplicaSetControllerRefManager( func NewReplicaSetControllerRefManager(
rsControl RSControlInterface, rsControl RSControlInterface,
controllerObject metav1.ObjectMeta, controller metav1.Object,
controllerSelector labels.Selector, selector labels.Selector,
controllerKind schema.GroupVersionKind, controllerKind schema.GroupVersionKind,
) *ReplicaSetControllerRefManager { ) *ReplicaSetControllerRefManager {
return &ReplicaSetControllerRefManager{rsControl, controllerObject, controllerSelector, controllerKind} return &ReplicaSetControllerRefManager{
baseControllerRefManager: baseControllerRefManager{
controller: controller,
selector: selector,
},
controllerKind: controllerKind,
rsControl: rsControl,
}
} }
// Classify, classifies the ReplicaSets into three categories: // ClaimReplicaSets tries to take ownership of a list of ReplicaSets.
// 1. matchesAndControlled are the ReplicaSets whose labels //
// match the selector of the Deployment, and have a controllerRef pointing to the // It will reconcile the following:
// Deployment. // * Adopt orphans if the selector matches.
// 2. matchesNeedsController are ReplicaSets ,whose labels match the Deployment, // * Release owned objects if the selector no longer matches.
// but don't have a controllerRef. (ReplicaSets with matching labels but with a //
// controllerRef pointing to other object are ignored) // A non-nil error is returned if some form of reconciliation was attemped and
// 3. controlledDoesNotMatch are the ReplicaSets that have a controllerRef pointing // failed. Usually, controllers should try again later in case reconciliation
// to the Deployment, but their labels no longer match the selector. // is still needed.
func (m *ReplicaSetControllerRefManager) Classify(replicaSets []*extensions.ReplicaSet) ( //
matchesAndControlled []*extensions.ReplicaSet, // If the error is nil, either the reconciliation succeeded, or no
matchesNeedsController []*extensions.ReplicaSet, // reconciliation was necessary. The list of ReplicaSets that you now own is
controlledDoesNotMatch []*extensions.ReplicaSet) { // returned.
for i := range replicaSets { func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.ReplicaSet) ([]*extensions.ReplicaSet, error) {
replicaSet := replicaSets[i] var claimed []*extensions.ReplicaSet
controllerRef := GetControllerOf(&replicaSet.ObjectMeta) var errlist []error
if controllerRef != nil {
if controllerRef.UID != m.controllerObject.UID { adopt := func(obj metav1.Object) error {
// ignoring the ReplicaSet controlled by other Deployment return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet))
glog.V(4).Infof("Ignoring ReplicaSet %v/%v, it's owned by [%s/%s, name: %s, uid: %s]", }
replicaSet.Namespace, replicaSet.Name, controllerRef.APIVersion, controllerRef.Kind, controllerRef.Name, controllerRef.UID) release := func(obj metav1.Object) error {
continue return m.ReleaseReplicaSet(obj.(*extensions.ReplicaSet))
} }
// already controlled by this Deployment
if m.controllerSelector.Matches(labels.Set(replicaSet.Labels)) { for _, rs := range sets {
matchesAndControlled = append(matchesAndControlled, replicaSet) ok, err := m.claimObject(rs, adopt, release)
} else { if err != nil {
controlledDoesNotMatch = append(controlledDoesNotMatch, replicaSet) errlist = append(errlist, err)
} continue
} else { }
if !m.controllerSelector.Matches(labels.Set(replicaSet.Labels)) { if ok {
continue claimed = append(claimed, rs)
}
matchesNeedsController = append(matchesNeedsController, replicaSet)
} }
} }
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch return claimed, utilerrors.NewAggregate(errlist)
} }
// AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns the error if // AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns the error if
// the patching fails. // the patching fails.
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.ReplicaSet) error { func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.ReplicaSet) error {
// we should not adopt any ReplicaSets if the Deployment is about to be deleted // we should not adopt any ReplicaSets if the Deployment is about to be deleted
if m.controllerObject.DeletionTimestamp != nil { if m.controller.GetDeletionTimestamp() != nil {
return fmt.Errorf("cancel the adopt attempt for RS %s because the controller %v is being deleted", return fmt.Errorf("cancel the adopt attempt for RS %s because the controller %v is being deleted",
strings.Join([]string{replicaSet.Namespace, replicaSet.Name, string(replicaSet.UID)}, "_"), m.controllerObject.Name) strings.Join([]string{replicaSet.Namespace, replicaSet.Name, string(replicaSet.UID)}, "_"), m.controller.GetName())
} }
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf( addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`, `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.controllerObject.Name, m.controllerObject.UID, replicaSet.UID) m.controller.GetName(), m.controller.GetUID(), replicaSet.UID)
return m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(addControllerPatch)) return m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(addControllerPatch))
} }
@ -227,8 +311,8 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extensions.ReplicaSet) error { func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extensions.ReplicaSet) error {
glog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", glog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerObject.Name) replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controllerObject.UID, replicaSet.UID) deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), replicaSet.UID)
err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch)) err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch))
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {

View File

@ -443,11 +443,11 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
dc.queue.Forget(key) dc.queue.Forget(key)
} }
// classifyReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets // claimReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets
// and adopts them if their labels match the Deployment but are missing the reference. // and adopts them if their labels match the Deployment but are missing the reference.
// It also removes the controllerRef for ReplicaSets, whose labels no longer matches // It also removes the controllerRef for ReplicaSets, whose labels no longer matches
// the deployment. // the deployment.
func (dc *DeploymentController) classifyReplicaSets(deployment *extensions.Deployment) error { func (dc *DeploymentController) claimReplicaSets(deployment *extensions.Deployment) error {
rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything()) rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything())
if err != nil { if err != nil {
return err return err
@ -457,32 +457,9 @@ func (dc *DeploymentController) classifyReplicaSets(deployment *extensions.Deplo
if err != nil { if err != nil {
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err) return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err)
} }
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment.ObjectMeta, deploymentSelector, getDeploymentKind()) cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment, deploymentSelector, getDeploymentKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(rsList) _, err = cm.ClaimReplicaSets(rsList)
// Adopt replica sets only if this deployment is not going to be deleted. return err
if deployment.DeletionTimestamp == nil {
for _, replicaSet := range matchesNeedsController {
err := cm.AdoptReplicaSet(replicaSet)
// continue to next RS if adoption fails.
if err != nil {
// If the RS no longer exists, don't even log the error.
if !errors.IsNotFound(err) {
utilruntime.HandleError(err)
}
} else {
matchesAndControlled = append(matchesAndControlled, replicaSet)
}
}
}
// remove the controllerRef for the RS that no longer have matching labels
var errlist []error
for _, replicaSet := range controlledDoesNotMatch {
err := cm.ReleaseReplicaSet(replicaSet)
if err != nil {
errlist = append(errlist, err)
}
}
return utilerrors.NewAggregate(errlist)
} }
// syncDeployment will sync the deployment with the given key. // syncDeployment will sync the deployment with the given key.
@ -565,7 +542,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
dc.cleanupDeployment(oldRSs, d) dc.cleanupDeployment(oldRSs, d)
} }
err = dc.classifyReplicaSets(d) err = dc.claimReplicaSets(d)
if err != nil { if err != nil {
return err return err
} }

View File

@ -33,7 +33,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -603,38 +602,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if err != nil { if err != nil {
return err return err
} }
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind()) cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) filteredPods, err = cm.ClaimPods(pods)
// Adopt pods only if this replica set is not going to be deleted. if err != nil {
if rs.DeletionTimestamp == nil { // Something went wrong with adoption or release.
for _, pod := range matchesNeedsController { // Requeue and try again so we don't leave orphans sitting around.
err := cm.AdoptPod(pod) rsc.queue.Add(key)
// continue to next pod if adoption fails. return err
if err != nil {
// If the pod no longer exists, don't even log the error.
if !errors.IsNotFound(err) {
utilruntime.HandleError(err)
}
} else {
matchesAndControlled = append(matchesAndControlled, pod)
}
}
}
filteredPods = matchesAndControlled
// remove the controllerRef for the pods that no longer have matching labels
var errlist []error
for _, pod := range controlledDoesNotMatch {
err := cm.ReleasePod(pod)
if err != nil {
errlist = append(errlist, err)
}
}
if len(errlist) != 0 {
aggregate := utilerrors.NewAggregate(errlist)
// push the RS into work queue again. We need to try to free the
// pods again otherwise they will stuck with the stale
// controllerRef.
return aggregate
} }
} else { } else {
pods, err := rsc.podLister.Pods(rs.Namespace).List(selector) pods, err := rsc.podLister.Pods(rs.Namespace).List(selector)

View File

@ -1224,14 +1224,20 @@ func TestPatchPodFails(t *testing.T) {
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil)) informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil)) informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil))
// let both patches fail. The rs controller will assume it fails to take // let both patches fail. The rs controller will assume it fails to take
// control of the pods and create new ones. // control of the pods and requeue to try again.
fakePodControl.Err = fmt.Errorf("Fake Error") fakePodControl.Err = fmt.Errorf("Fake Error")
err := manager.syncReplicaSet(getKey(rs, t)) rsKey := getKey(rs, t)
if err == nil || err.Error() != "Fake Error" { err := manager.syncReplicaSet(rsKey)
if err == nil || !strings.Contains(err.Error(), "Fake Error") {
t.Errorf("expected Fake Error, got %+v", err) t.Errorf("expected Fake Error, got %+v", err)
} }
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates. // 2 patches to take control of pod1 and pod2 (both fail).
validateSyncReplicaSet(t, fakePodControl, 2, 0, 2) validateSyncReplicaSet(t, fakePodControl, 0, 0, 2)
// RS should requeue itself.
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
} }
func TestPatchExtraPodsThenDelete(t *testing.T) { func TestPatchExtraPodsThenDelete(t *testing.T) {

View File

@ -30,7 +30,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/trace", "//vendor:k8s.io/apiserver/pkg/util/trace",

View File

@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utiltrace "k8s.io/apiserver/pkg/util/trace" utiltrace "k8s.io/apiserver/pkg/util/trace"
@ -620,39 +619,13 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
rm.queue.Add(key) rm.queue.Add(key)
return err return err
} }
cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) filteredPods, err = cm.ClaimPods(pods)
// Adopt pods only if this replication controller is not going to be deleted. if err != nil {
if rc.DeletionTimestamp == nil { // Something went wrong with adoption or release.
for _, pod := range matchesNeedsController { // Requeue and try again so we don't leave orphans sitting around.
err := cm.AdoptPod(pod)
// continue to next pod if adoption fails.
if err != nil {
// If the pod no longer exists, don't even log the error.
if !errors.IsNotFound(err) {
utilruntime.HandleError(err)
}
} else {
matchesAndControlled = append(matchesAndControlled, pod)
}
}
}
filteredPods = matchesAndControlled
// remove the controllerRef for the pods that no longer have matching labels
var errlist []error
for _, pod := range controlledDoesNotMatch {
err := cm.ReleasePod(pod)
if err != nil {
errlist = append(errlist, err)
}
}
if len(errlist) != 0 {
aggregate := utilerrors.NewAggregate(errlist)
// push the RC into work queue again. We need to try to free the
// pods again otherwise they will stuck with the stale
// controllerRef.
rm.queue.Add(key) rm.queue.Add(key)
return aggregate return err
} }
} else { } else {
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())

View File

@ -1190,14 +1190,20 @@ func TestPatchPodFails(t *testing.T) {
podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil)) podInformer.Informer().GetIndexer().Add(newPod("pod1", rc, v1.PodRunning, nil))
podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil)) podInformer.Informer().GetIndexer().Add(newPod("pod2", rc, v1.PodRunning, nil))
// let both patches fail. The rc manager will assume it fails to take // let both patches fail. The rc manager will assume it fails to take
// control of the pods and create new ones. // control of the pods and requeue to try again.
fakePodControl.Err = fmt.Errorf("Fake Error") fakePodControl.Err = fmt.Errorf("Fake Error")
err := manager.syncReplicationController(getKey(rc, t)) rcKey := getKey(rc, t)
if err == nil || err.Error() != "Fake Error" { err := manager.syncReplicationController(rcKey)
if err == nil || !strings.Contains(err.Error(), "Fake Error") {
t.Fatalf("expected Fake Error, got %v", err) t.Fatalf("expected Fake Error, got %v", err)
} }
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates. // 2 patches to take control of pod1 and pod2 (both fail).
validateSyncReplication(t, fakePodControl, 2, 0, 2) validateSyncReplication(t, fakePodControl, 0, 0, 2)
// RC should requeue itself.
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
} }
func TestPatchExtraPodsThenDelete(t *testing.T) { func TestPatchExtraPodsThenDelete(t *testing.T) {