From 0035d862488c381e12c025aff060b6ec6cb593cb Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Fri, 2 Jun 2017 18:02:01 -0700 Subject: [PATCH] Update adoption/release of DaemonSet controller history --- pkg/controller/BUILD | 1 + pkg/controller/controller_ref_manager.go | 119 ++++++++++++++++++ pkg/controller/controller_utils.go | 20 +++ pkg/controller/daemon/daemoncontroller.go | 4 + .../daemon/daemoncontroller_test.go | 3 + pkg/controller/daemon/update.go | 31 +++-- 6 files changed, 168 insertions(+), 10 deletions(-) diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index beafe906298..3f77ff750ed 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/ref:go_default_library", "//pkg/api/validation:go_default_library", + "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/authentication/v1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 507995c004d..96b546f6f27 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/api/v1" + appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" ) @@ -394,3 +395,121 @@ func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() er return nil } } + +// ControllerRevisionControllerRefManager is used to manage controllerRef of ControllerRevisions. +// Three methods are defined on this object 1: Classify 2: AdoptControllerRevision and +// 3: ReleaseControllerRevision which are used to classify the ControllerRevisions into appropriate +// categories and accordingly adopt or release them. See comments on these functions +// for more details. +type ControllerRevisionControllerRefManager struct { + baseControllerRefManager + controllerKind schema.GroupVersionKind + crControl ControllerRevisionControlInterface +} + +// NewControllerRevisionControllerRefManager returns a ControllerRevisionControllerRefManager that exposes +// methods to manage the controllerRef of ControllerRevisions. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// ControllerRevisionControllerRefManager instance. Create a new instance if it +// makes sense to check canAdopt() again (e.g. in a different sync pass). +func NewControllerRevisionControllerRefManager( + crControl ControllerRevisionControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *ControllerRevisionControllerRefManager { + return &ControllerRevisionControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + crControl: crControl, + } +} + +// ClaimControllerRevisions tries to take ownership of a list of ControllerRevisions. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of ControllerRevisions that you now own is +// returned. +func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*appsv1beta1.ControllerRevision) ([]*appsv1beta1.ControllerRevision, error) { + var claimed []*appsv1beta1.ControllerRevision + var errlist []error + + match := func(obj metav1.Object) bool { + return m.selector.Matches(labels.Set(obj.GetLabels())) + } + adopt := func(obj metav1.Object) error { + return m.AdoptControllerRevision(obj.(*appsv1beta1.ControllerRevision)) + } + release := func(obj metav1.Object) error { + return m.ReleaseControllerRevision(obj.(*appsv1beta1.ControllerRevision)) + } + + for _, h := range histories { + ok, err := m.claimObject(h, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, h) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if +// the patching fails. +func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *appsv1beta1.ControllerRevision) error { + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.controller.GetName(), m.controller.GetUID(), history.UID) + return m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(addControllerPatch)) +} + +// ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(history *appsv1beta1.ControllerRevision) error { + glog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", + history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), history.UID) + err := m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the ControllerRevision no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the ControllerRevision + // has no owner reference, 2. the uid of the ControllerRevision doesn't + // match, which means the ControllerRevision is deleted and then recreated. + // In both cases, the error can be ignored. + return nil + } + } + return err +} diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index f65e697d615..9f1a17767f4 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -401,6 +401,26 @@ func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) erro return err } +// TODO: merge the controller revision interface in controller_history.go with this one +// ControllerRevisionControlInterface is an interface that knows how to patch +// ControllerRevisions, as well as increment or decrement them. It is used +// by the daemonset controller to ease testing of actions that it takes. +type ControllerRevisionControlInterface interface { + PatchControllerRevision(namespace, name string, data []byte) error +} + +// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface. +type RealControllerRevisionControl struct { + KubeClient clientset.Interface +} + +var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{} + +func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error { + _, err := r.KubeClient.AppsV1beta1().ControllerRevisions(namespace).Patch(name, types.StrategicMergePatchType, data) + return err +} + // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 0e28cdfdaf7..299ca5e6bb0 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -86,6 +86,7 @@ type DaemonSetsController struct { kubeClient clientset.Interface eventRecorder record.EventRecorder podControl controller.PodControlInterface + crControl controller.ControllerRevisionControlInterface // An dsc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. @@ -138,6 +139,9 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "daemon-set"}), }, + crControl: controller.RealControllerRevisionControl{ + KubeClient: kubeClient, + }, burstReplicas: BurstReplicas, expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index 0e7fabc07bc..afe816f7b5a 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -274,6 +274,7 @@ type daemonSetsController struct { *DaemonSetsController dsStore cache.Store + historyStore cache.Store podStore cache.Store nodeStore cache.Store fakeRecorder *record.FakeRecorder @@ -297,6 +298,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, manager.podStoreSynced = alwaysReady manager.nodeStoreSynced = alwaysReady manager.dsStoreSynced = alwaysReady + manager.historyStoreSynced = alwaysReady podControl := newFakePodControl() manager.podControl = podControl podControl.podStore = informerFactory.Core().V1().Pods().Informer().GetStore() @@ -304,6 +306,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, return &daemonSetsController{ manager, informerFactory.Extensions().V1beta1().DaemonSets().Informer().GetStore(), + informerFactory.Apps().V1beta1().ControllerRevisions().Informer().GetStore(), informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Nodes().Informer().GetStore(), fakeRecorder, diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index dfb5839b68f..a71733c6fd9 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -79,6 +79,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, hash st return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) } +// constructHistory returns current history and a list of old histories of a given DaemonSet. func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { var histories []*apps.ControllerRevision var currentHistories []*apps.ControllerRevision @@ -272,27 +273,37 @@ func (dsc *DaemonSetsController) dedupCurHistories(ds *extensions.DaemonSet, cur return keepCur, nil } -// controlledHistories returns all ControllerRevisions controlled by the given DaemonSet +// controlledHistories returns all ControllerRevisions controlled by the given DaemonSet. +// This also reconciles ControllerRef by adopting/orphaning. // Note that returned histories are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. func (dsc *DaemonSetsController) controlledHistories(ds *extensions.DaemonSet) ([]*apps.ControllerRevision, error) { - var result []*apps.ControllerRevision selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err } - histories, err := dsc.historyLister.List(selector) + + // List all histories to include those that don't match the selector anymore + // but have a ControllerRef pointing to the controller. + histories, err := dsc.historyLister.List(labels.Everything()) if err != nil { return nil, err } - for _, history := range histories { - // Skip history that doesn't belong to the DaemonSet - if controllerRef := controller.GetControllerOf(history); controllerRef == nil || controllerRef.UID != ds.UID { - continue + // If any adoptions are attempted, we should first recheck for deletion with + // an uncached quorum read sometime after listing Pods (see #42639). + canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).Get(ds.Name, metav1.GetOptions{}) + if err != nil { + return nil, err } - result = append(result, history) - } - return result, nil + if fresh.UID != ds.UID { + return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID) + } + return fresh, nil + }) + // Use ControllerRefManager to adopt/orphan as needed. + cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc) + return cm.ClaimControllerRevisions(histories) } // Match check if the given DaemonSet's template matches the template stored in the given history.