From f269e78ebc62db00ca69b2e27e52e1c33a55a627 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 11:44:09 -0800 Subject: [PATCH 01/11] StatefulSet: Don't touch Pods if DeletionTimestamp is set. --- .../statefulset/stateful_set_control.go | 6 ++ .../statefulset/stateful_set_control_test.go | 63 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index d3ce799b2ef..86645d07b22 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -111,6 +111,12 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } } + // If the StatefulSet is being deleted, don't do anything other than updating + // status. + if set.DeletionTimestamp != nil { + return nil + } + // Examine each replica with respect to its ordinal for i := range replicas { // delete and recreate failed pods diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 2a74eeea89f..8a8fa67c5ff 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -214,6 +214,69 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { } } +func TestStatefulSetDeletionTimestamp(t *testing.T) { + set := newStatefulSet(5) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + + // Bring up a StatefulSet. + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + t.Errorf("failed to turn up StatefulSet : %s", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != 5 { + t.Error("failed to scale statefulset to 5 replicas") + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + sort.Sort(ascendingOrdinal(pods)) + + // Mark the StatefulSet as being deleted. + set.DeletionTimestamp = new(metav1.Time) + + // Delete the first pod. + spc.podsIndexer.Delete(pods[0]) + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + + // The StatefulSet should update its replica count, + // but not try to fix it. + if err := ssc.UpdateStatefulSet(set, pods); err != nil { + t.Errorf("failed to update StatefulSet : %s", err) + } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("error getting updated StatefulSet: %v", err) + } + if e, a := int32(4), set.Status.Replicas; e != a { + t.Errorf("expected to scale to %d, got %d", e, a) + } +} + func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) From b5dfc7b2aadb6c2fb8ba5783fecb116e00ebff8b Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 12:01:02 -0800 Subject: [PATCH 02/11] StatefulSet: Add ControllerRef on all created Pods. --- .../statefulset/stateful_set_utils.go | 15 ++++++++++- .../statefulset/stateful_set_utils_test.go | 25 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 59811c1e6d4..231ef945675 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -230,9 +231,21 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } +// newControllerRef returns an ControllerRef pointing to a given StatefulSet. +func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { + isController := true + return &metav1.OwnerReference{ + APIVersion: apps.SchemeGroupVersion.String(), + Kind: "StatefulSet", + Name: set.Name, + UID: set.UID, + Controller: &isController, + } +} + // newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal. func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod { - pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, nil) + pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, newControllerRef(set)) pod.Name = getPodName(set, ordinal) updateIdentity(set, pod) updateStorage(set, pod) diff --git a/pkg/controller/statefulset/stateful_set_utils_test.go b/pkg/controller/statefulset/stateful_set_utils_test.go index b6e80473985..c53cbb10a6d 100644 --- a/pkg/controller/statefulset/stateful_set_utils_test.go +++ b/pkg/controller/statefulset/stateful_set_utils_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/controller" ) func TestGetParentNameAndOrdinal(t *testing.T) { @@ -271,6 +272,30 @@ func TestOverlappingStatefulSets(t *testing.T) { } } +func TestNewPodControllerRef(t *testing.T) { + set := newStatefulSet(1) + pod := newStatefulSetPod(set, 0) + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + t.Fatalf("No ControllerRef found on new pod") + } + if got, want := controllerRef.APIVersion, apps.SchemeGroupVersion.String(); got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "StatefulSet"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if got, want := controllerRef.Name, set.Name; got != want { + t.Errorf("controllerRef.Name = %q, want %q", got, want) + } + if got, want := controllerRef.UID, set.UID; got != want { + t.Errorf("controllerRef.UID = %q, want %q", got, want) + } + if got, want := *controllerRef.Controller, true; got != want { + t.Errorf("controllerRef.Controller = %v, want %v", got, want) + } +} + func newPVC(name string) v1.PersistentVolumeClaim { return v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ From cfd8a389b7f6e05add70ce0c9ed287b4acf621fc Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 12:23:14 -0800 Subject: [PATCH 03/11] StatefulSet: Set DefaultGarbageCollectionPolicy to OrphanDependents. Now that StatefulSet adds ControllerRef to Pods it creates, we need to set this default so legacy behavior is maintained. --- pkg/registry/apps/petset/BUILD | 2 ++ pkg/registry/apps/petset/strategy.go | 7 +++++++ pkg/registry/apps/petset/strategy_test.go | 16 ++++++++++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/registry/apps/petset/BUILD b/pkg/registry/apps/petset/BUILD index d6b9ba448ac..edf6162bcd0 100644 --- a/pkg/registry/apps/petset/BUILD +++ b/pkg/registry/apps/petset/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/validation/field", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/generic", + "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/names", ], @@ -40,6 +41,7 @@ go_test( "//pkg/apis/apps:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apiserver/pkg/endpoints/request", + "//vendor:k8s.io/apiserver/pkg/registry/rest", ], ) diff --git a/pkg/registry/apps/petset/strategy.go b/pkg/registry/apps/petset/strategy.go index a60f8f7e284..4a61860baab 100644 --- a/pkg/registry/apps/petset/strategy.go +++ b/pkg/registry/apps/petset/strategy.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -42,6 +43,12 @@ type statefulSetStrategy struct { // Strategy is the default logic that applies when creating and updating Replication StatefulSet objects. var Strategy = statefulSetStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (statefulSetStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all StatefulSet' need to be within a namespace. func (statefulSetStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/apps/petset/strategy_test.go b/pkg/registry/apps/petset/strategy_test.go index 1690aa8ce96..99065690b09 100644 --- a/pkg/registry/apps/petset/strategy_test.go +++ b/pkg/registry/apps/petset/strategy_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/apps" ) @@ -62,7 +63,7 @@ func TestStatefulSetStrategy(t *testing.T) { } errs := Strategy.Validate(ctx, ps) if len(errs) != 0 { - t.Errorf("Unexpected error validating %v", errs) + t.Errorf("unexpected error validating %v", errs) } // Just Spec.Replicas is allowed to change @@ -77,14 +78,21 @@ func TestStatefulSetStrategy(t *testing.T) { Strategy.PrepareForUpdate(ctx, validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) != 0 { - t.Errorf("Updating spec.Replicas is allowed on a statefulset: %v", errs) + t.Errorf("updating spec.Replicas is allowed on a statefulset: %v", errs) } validPs.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"a": "bar"}} Strategy.PrepareForUpdate(ctx, validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) == 0 { - t.Errorf("Expected a validation error since updates are disallowed on statefulsets.") + t.Errorf("expected a validation error since updates are disallowed on statefulsets.") + } + + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) } } @@ -140,6 +148,6 @@ func TestStatefulSetStatusStrategy(t *testing.T) { } errs := StatusStrategy.ValidateUpdate(ctx, newPS, oldPS) if len(errs) != 0 { - t.Errorf("Unexpected error %v", errs) + t.Errorf("unexpected error %v", errs) } } From b7163bdb7546762bc8c84502d0830d24b85e8ea2 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 16:23:59 -0800 Subject: [PATCH 04/11] ControllerRefManager: Allow matching by more than just selector. --- pkg/controller/controller_ref_manager.go | 36 ++++++++++++++++++------ 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index d3bc26473a0..0e16704ba05 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -51,8 +51,8 @@ type baseControllerRefManager struct { // claimObject tries to take ownership of an object for this controller. // // It will reconcile the following: -// * Adopt orphans if the selector matches. -// * Release owned objects if the selector no longer matches. +// * Adopt orphans if the match function returns true. +// * Release owned objects if the match function returns false. // // A non-nil error is returned if some form of reconciliation was attemped and // failed. Usually, controllers should try again later in case reconciliation @@ -63,14 +63,14 @@ type baseControllerRefManager struct { // own the object. // // No reconciliation will be attempted if the controller is being deleted. -func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release func(metav1.Object) error) (bool, error) { +func (m *baseControllerRefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { controllerRef := GetControllerOf(obj) if controllerRef != nil { if controllerRef.UID != m.controller.GetUID() { // Owned by someone else. Ignore. return false, nil } - if m.selector.Matches(labels.Set(obj.GetLabels())) { + if match(obj) { // We already own it and the selector matches. // Return true (successfully claimed) before checking deletion timestamp. // We're still allowed to claim things we already own while being deleted @@ -96,8 +96,7 @@ func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release } // It's an orphan. - if m.controller.GetDeletionTimestamp() != nil || - !m.selector.Matches(labels.Set(obj.GetLabels())) { + if m.controller.GetDeletionTimestamp() != nil || !match(obj) { // Ignore if we're being deleted or selector doesn't match. return false, nil } @@ -145,16 +144,32 @@ func NewPodControllerRefManager( // * Adopt orphans if the selector matches. // * Release owned objects if the selector no longer matches. // +// Optional: If one or more filters are specified, a Pod will only be claimed if +// all filters return true. +// // A non-nil error is returned if some form of reconciliation was attemped and // failed. Usually, controllers should try again later in case reconciliation // is still needed. // // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of Pods that you now own is returned. -func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { var claimed []*v1.Pod var errlist []error + match := func(obj metav1.Object) bool { + pod := obj.(*v1.Pod) + // Check selector first so filters only run on potentially matching Pods. + if !m.selector.Matches(labels.Set(pod.Labels)) { + return false + } + for _, filter := range filters { + if !filter(pod) { + return false + } + } + return true + } adopt := func(obj metav1.Object) error { return m.AdoptPod(obj.(*v1.Pod)) } @@ -163,7 +178,7 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { } for _, pod := range pods { - ok, err := m.claimObject(pod, adopt, release) + ok, err := m.claimObject(pod, match, adopt, release) if err != nil { errlist = append(errlist, err) continue @@ -265,6 +280,9 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep var claimed []*extensions.ReplicaSet var errlist []error + match := func(obj metav1.Object) bool { + return m.selector.Matches(labels.Set(obj.GetLabels())) + } adopt := func(obj metav1.Object) error { return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet)) } @@ -273,7 +291,7 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep } for _, rs := range sets { - ok, err := m.claimObject(rs, adopt, release) + ok, err := m.claimObject(rs, match, adopt, release) if err != nil { errlist = append(errlist, err) continue From e4f67c81707851fe82aa0277f19620963613a926 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 16:27:20 -0800 Subject: [PATCH 05/11] StatefulSet: Use ControllerRefManager to adopt/orphan. --- pkg/controller/statefulset/BUILD | 2 + pkg/controller/statefulset/stateful_set.go | 37 +++++++-- .../statefulset/stateful_set_control.go | 1 + .../statefulset/stateful_set_test.go | 83 +++++++++++++++++++ .../statefulset/stateful_set_utils.go | 5 ++ 5 files changed, 121 insertions(+), 7 deletions(-) diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 655ef260403..29aa56d65d0 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -32,6 +32,8 @@ go_library( "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", + "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 896731037ee..8e7d64d6014 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -56,6 +57,8 @@ type StatefulSetController struct { // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface + // podControl is used for patching pods. + podControl controller.PodControlInterface // podLister is able to list/get pods from a shared informer's store podLister corelisters.PodLister // podListerSynced returns true if the pod shared informer has synced at least once @@ -95,6 +98,7 @@ func NewStatefulSetController( ), pvcListerSynced: pvcInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), + podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,13 +214,26 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { } } -// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset. -func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) { - sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) +// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// +// NOTE: Returned Pods are pointers to objects from the cache. +// If you need to modify one, you need to copy it first. +func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { + // List all pods to include the pods that don't match the selector anymore but + // has a ControllerRef pointing to this StatefulSet. + pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything()) if err != nil { - return []*v1.Pod{}, err + return nil, err } - return ssc.podLister.Pods(set.Namespace).List(sel) + + filter := func(pod *v1.Pod) bool { + // Only claim if it matches our StatefulSet name. Otherwise release/ignore. + return isMemberOf(set, pod) + } + + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind()) + return cm.ClaimPods(pods, filter) } // getStatefulSetForPod returns the StatefulSet managing the given pod. @@ -298,11 +315,17 @@ func (ssc *StatefulSetController) sync(key string) error { return nil } if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err)) + utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) return err } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) + // This is a non-transient error, so don't retry. + return nil + } - pods, err := ssc.getPodsForStatefulSet(set) + pods, err := ssc.getPodsForStatefulSet(set, selector) if err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 86645d07b22..6389d79117a 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -74,6 +74,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p // if the ordinal is greater than the number of replicas add it to the condemned list condemned = append(condemned, pods[i]) } + // If the ordinal could not be parsed (ord < 0), ignore the Pod. } // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 3d927e2609e..4069a80ac76 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "reflect" "sort" "testing" "time" @@ -345,6 +346,88 @@ func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { } } +func TestGetPodsForStatefulSetAdopt(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(5) + pod1 := newStatefulSetPod(set, 1) + // pod2 is an orphan with matching labels and name. + pod2 := newStatefulSetPod(set, 2) + pod2.OwnerReferences = nil + // pod3 has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.OwnerReferences = nil + pod3.Labels = nil + // pod4 has wrong name. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Name = "x" + pod4.Name + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + spc.podsIndexer.Add(pod4) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // pod2 should be claimed, pod3 and pod4 ignored + want := []string{pod1.Name, pod2.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) + } +} + +func TestGetPodsForStatefulSetRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + pod1 := newStatefulSetPod(set, 1) + // pod2 is owned but has wrong name. + pod2 := newStatefulSetPod(set, 2) + pod2.Name = "x" + pod2.Name + // pod3 is owned but has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.Labels = nil + // pod4 is an orphan that doesn't match. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Labels = nil + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // Expect only pod1 (pod2 and pod3 should be released, pod4 ignored). + want := []string{pod1.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) + } +} + func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 231ef945675..aefa51b32ff 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -22,6 +22,7 @@ import ( "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -231,6 +232,10 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } +func getSSKind() schema.GroupVersionKind { + return apps.SchemeGroupVersion.WithKind("StatefulSet") +} + // newControllerRef returns an ControllerRef pointing to a given StatefulSet. func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { isController := true From ea85a201c710615aae43d65ec928a35ebd54d817 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Fri, 24 Feb 2017 12:56:10 -0800 Subject: [PATCH 06/11] StatefulSet: Use ControllerRef to route watch events. This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches --- pkg/controller/statefulset/BUILD | 1 - pkg/controller/statefulset/stateful_set.go | 113 ++++++--- .../statefulset/stateful_set_test.go | 236 ++++++++++++------ .../statefulset/stateful_set_utils.go | 9 +- 4 files changed, 248 insertions(+), 111 deletions(-) diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 29aa56d65d0..0eb20aca1aa 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -33,7 +33,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 8e7d64d6014..2c430ae9228 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -19,7 +19,6 @@ package statefulset import ( "fmt" "reflect" - "sort" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -50,6 +49,9 @@ const ( statefulSetResyncPeriod = 30 * time.Second ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet") + // StatefulSetController controls statefulsets. type StatefulSetController struct { // client interface @@ -158,15 +160,36 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { func (ssc *StatefulSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) - set := ssc.getStatefulSetForPod(pod) - if set == nil { + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + ssc.deletePod(pod) return } - ssc.enqueueStatefulSet(set) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + for _, set := range ssc.getStatefulSetsForPod(pod) { + ssc.enqueueStatefulSet(set) + } } // updatePod adds the statefulset for the current and old pods to the sync queue. -// If the labels of the pod didn't change, this method enqueues a single statefulset. func (ssc *StatefulSetController) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) @@ -175,15 +198,41 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - set := ssc.getStatefulSetForPod(curPod) - if set == nil { + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + set, err := ssc.setLister.StatefulSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + ssc.enqueueStatefulSet(set) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + set, err := ssc.setLister.StatefulSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) return } - ssc.enqueueStatefulSet(set) - // TODO will we need this going forward with controller ref impl? - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil { - ssc.enqueueStatefulSet(oldSet) + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, set := range ssc.getStatefulSetsForPod(curPod) { + ssc.enqueueStatefulSet(set) } } } @@ -209,9 +258,22 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { } } glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) - if set := ssc.getStatefulSetForPod(pod); set != nil { - ssc.enqueueStatefulSet(set) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) } // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. @@ -232,12 +294,13 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s return isMemberOf(set, pod) } - cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind()) + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind) return cm.ClaimPods(pods, filter) } -// getStatefulSetForPod returns the StatefulSet managing the given pod. -func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet { +// getStatefulSetsForPod returns a list of StatefulSets that potentially match +// a given pod. +func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet { sets, err := ssc.setLister.GetPodStatefulSets(pod) if err != nil { glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) @@ -245,24 +308,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef } // More than one set is selecting the same Pod if len(sets) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError( fmt.Errorf( "user error: more than one StatefulSet is selecting pods with labels: %+v", pod.Labels)) - // The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by - // name - sort.Sort(overlappingStatefulSets(sets)) - // return the first created set for which pod is a member - for i := range sets { - if isMemberOf(sets[i], pod) { - return sets[i] - } - } - glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) - return nil } - return sets[0] - + return sets } // enqueueStatefulSet enqueues the given statefulset in the work queue. diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 4069a80ac76..b996b6ca492 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -20,7 +20,6 @@ import ( "reflect" "sort" "testing" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -177,23 +176,60 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) { } } -func TestStateSetControllerAddPod(t *testing.T) { +func TestStatefulSetControllerAddPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.addPod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.addPod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) + + ssc.addPod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) +} + +func TestStatefulSetControllerAddPodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + set3 := newStatefulSet(3) + set3.Name = "foo3" + set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"} + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + spc.setsIndexer.Add(set3) + + // Make pod an orphan. Expect matching sets to be queued. + pod.OwnerReferences = nil + ssc.addPod(pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } -func TestStateSetControllerAddPodNoSet(t *testing.T) { +func TestStatefulSetControllerAddPodNoSet(t *testing.T) { ssc, _ := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -207,19 +243,36 @@ func TestStateSetControllerAddPodNoSet(t *testing.T) { func TestStatefulSetControllerUpdatePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - prev := *pod - fakeResourceVersion(pod) - ssc.updatePod(&prev, pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + prev := *pod1 + fakeResourceVersion(pod1) + ssc.updatePod(&prev, pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + prev = *pod2 + fakeResourceVersion(pod2) + ssc.updatePod(&prev, pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } @@ -250,53 +303,106 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) { } } -func TestStatefulSetControllerUpdatePodWithNewLabels(t *testing.T) { +func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) + pod.OwnerReferences = nil set2 := newStatefulSet(3) set2.Name = "foo2" - set2.Spec.Selector.MatchLabels = map[string]string{"foo2": "bar2"} - set2.Spec.Template.Labels = map[string]string{"foo2": "bar2"} spc.setsIndexer.Add(set) spc.setsIndexer.Add(set2) clone := *pod clone.Labels = map[string]string{"foo2": "bar2"} fakeResourceVersion(&clone) - ssc.updatePod(pod, &clone) - key, done := ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } - key, done = ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) +} + +func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = pod2.OwnerReferences + fakeResourceVersion(&clone) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + +func TestStatefulSetControllerUpdatePodRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = nil + fakeResourceVersion(&clone) + ssc.updatePod(pod, &clone) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerDeletePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.deletePod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.deletePod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + ssc.deletePod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } + +func TestStatefulSetControllerDeletePodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + pod1.OwnerReferences = nil + ssc.deletePod(pod1) + if got, want := ssc.queue.Len(), 0; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) @@ -307,42 +413,26 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc.deletePod(tombstone) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") + t.Error("key is not a string") } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } -func TestStatefulSetControllerGetStatefulSetForPod(t *testing.T) { +func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - spc.podsIndexer.Add(pod) - if set := ssc.getStatefulSetForPod(pod); set == nil { - t.Error("Failed to get StatefulSet for Pod ") - } -} - -func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { - ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) + set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" - set3 := newStatefulSet(3) - set3.Name = "foo3" - set3.CreationTimestamp.Add(1 * time.Second) - spc.setsIndexer.Add(set3) + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) spc.setsIndexer.Add(set2) - spc.setsIndexer.Add(set) spc.podsIndexer.Add(pod) - if found := ssc.getStatefulSetForPod(pod); found == nil { - t.Error("Failed to get StatefulSet for Pod") - } else if found.Name != set.Name { - t.Errorf("Returned wrong StatefulSet %s for Pod", set.Name) + sets := ssc.getStatefulSetsForPod(pod) + if got, want := len(sets), 2; got != want { + t.Errorf("len(sets) = %v, want %v", got, want) } } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index aefa51b32ff..89bca6a60fa 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -22,7 +22,6 @@ import ( "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -232,16 +231,12 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } -func getSSKind() schema.GroupVersionKind { - return apps.SchemeGroupVersion.WithKind("StatefulSet") -} - // newControllerRef returns an ControllerRef pointing to a given StatefulSet. func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { isController := true return &metav1.OwnerReference{ - APIVersion: apps.SchemeGroupVersion.String(), - Kind: "StatefulSet", + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: set.Name, UID: set.UID, Controller: &isController, From 25d90cdaec71e38165742a0ebe37411bdd2e66e2 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Fri, 24 Feb 2017 13:10:50 -0800 Subject: [PATCH 07/11] StatefulSet: Update Lister documentation for ControllerRef. The StatefulSet Listers still use selectors, because this is the behavior expected by callers. This clarifies the meaning of the returned list. Some callers may need to switch to using GetControllerOf() instead, but that is a separate, case-by-case issue. --- .../listers/apps/internalversion/statefulset_expansion.go | 4 +++- pkg/client/listers/apps/v1beta1/statefulset_expansion.go | 4 +++- .../client-go/listers/apps/v1beta1/statefulset_expansion.go | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/client/listers/apps/internalversion/statefulset_expansion.go b/pkg/client/listers/apps/internalversion/statefulset_expansion.go index 0b58cf6a817..0c54dbb6494 100644 --- a/pkg/client/listers/apps/internalversion/statefulset_expansion.go +++ b/pkg/client/listers/apps/internalversion/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *api.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet diff --git a/pkg/client/listers/apps/v1beta1/statefulset_expansion.go b/pkg/client/listers/apps/v1beta1/statefulset_expansion.go index e032ae227f3..2eea9b1015c 100644 --- a/pkg/client/listers/apps/v1beta1/statefulset_expansion.go +++ b/pkg/client/listers/apps/v1beta1/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet diff --git a/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go b/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go index d34e894ac35..4473477e219 100644 --- a/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go +++ b/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet From 2248187536cf7af2fb3518466ed2f2174880194e Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 2 Mar 2017 10:14:54 -0800 Subject: [PATCH 08/11] StatefulSet: Don't log Pod events unless some StatefulSet cares. --- pkg/controller/statefulset/stateful_set.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 2c430ae9228..463c243463d 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -159,7 +159,6 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { // addPod adds the statefulset for the pod to the sync queue func (ssc *StatefulSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -174,6 +173,7 @@ func (ssc *StatefulSetController) addPod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) if err != nil { return @@ -184,7 +184,12 @@ func (ssc *StatefulSetController) addPod(obj interface{}) { // Otherwise, it's an orphan. Get a list of all matching controllers and sync // them to see if anyone wants to adopt it. - for _, set := range ssc.getStatefulSetsForPod(pod) { + sets := ssc.getStatefulSetsForPod(pod) + if len(sets) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels) + for _, set := range sets { ssc.enqueueStatefulSet(set) } } @@ -198,7 +203,6 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) @@ -220,6 +224,7 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) set, err := ssc.setLister.StatefulSets(curPod.Namespace).Get(curControllerRef.Name) if err != nil { return @@ -231,7 +236,12 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. if labelChanged || controllerRefChanged { - for _, set := range ssc.getStatefulSetsForPod(curPod) { + sets := ssc.getStatefulSetsForPod(curPod) + if len(sets) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + for _, set := range sets { ssc.enqueueStatefulSet(set) } } @@ -257,7 +267,6 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { return } } - glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { @@ -268,6 +277,7 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) if err != nil { @@ -303,7 +313,6 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet { sets, err := ssc.setLister.GetPodStatefulSets(pod) if err != nil { - glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) return nil } // More than one set is selecting the same Pod From 6679a5a31f9b4b5a7247e92c1ec8e69e9ad1a35b Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 2 Mar 2017 10:26:30 -0800 Subject: [PATCH 09/11] StatefulSet: Always set BlockOwnerDeletion in ControllerRef. --- pkg/controller/statefulset/stateful_set_utils.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 89bca6a60fa..672fd5b165a 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -233,13 +233,15 @@ func isHealthy(pod *v1.Pod) bool { // newControllerRef returns an ControllerRef pointing to a given StatefulSet. func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { + blockOwnerDeletion := true isController := true return &metav1.OwnerReference{ - APIVersion: controllerKind.GroupVersion().String(), - Kind: controllerKind.Kind, - Name: set.Name, - UID: set.UID, - Controller: &isController, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: set.Name, + UID: set.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, } } From d8a001755915e0b0c939d22011fd16d25f45aa7c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Fri, 3 Mar 2017 14:39:25 -0800 Subject: [PATCH 10/11] StatefulSet: Use synchronous Delete of SS in e2e. This is needed because we changed the default for SS to OrphanDependents. --- test/e2e/framework/statefulset_utils.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/e2e/framework/statefulset_utils.go b/test/e2e/framework/statefulset_utils.go index 1f08ba57471..6b4129692f4 100644 --- a/test/e2e/framework/statefulset_utils.go +++ b/test/e2e/framework/statefulset_utils.go @@ -408,7 +408,9 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) { } sst.WaitForStatus(&ss, 0) Logf("Deleting statefulset %v", ss.Name) - if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil { + // Use OrphanDependents=false so it's deleted synchronously. + // We already made sure the Pods are gone inside Scale(). + if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, &metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil { errList = append(errList, fmt.Sprintf("%v", err)) } } From 399c19a2ad551df4440e725266637d332bbbcd26 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Mon, 6 Mar 2017 10:36:41 -0800 Subject: [PATCH 11/11] StatefulSet: Check that ControllerRef UID matches. --- pkg/controller/statefulset/stateful_set.go | 52 ++++++++++++---------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 463c243463d..04fce3b72db 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -169,15 +169,11 @@ func (ssc *StatefulSetController) addPod(obj interface{}) { // If it has a ControllerRef, that's all that matters. if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. + set := ssc.resolveControllerRef(pod.Namespace, controllerRef) + if set == nil { return } glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) - set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) - if err != nil { - return - } ssc.enqueueStatefulSet(set) return } @@ -209,26 +205,20 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { curControllerRef := controller.GetControllerOf(curPod) oldControllerRef := controller.GetControllerOf(oldPod) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) - if controllerRefChanged && - oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. - set, err := ssc.setLister.StatefulSets(oldPod.Namespace).Get(oldControllerRef.Name) - if err == nil { + if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil { ssc.enqueueStatefulSet(set) } } // If it has a ControllerRef, that's all that matters. if curControllerRef != nil { - if curControllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. + set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef) + if set == nil { return } glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) - set, err := ssc.setLister.StatefulSets(curPod.Namespace).Get(curControllerRef.Name) - if err != nil { - return - } ssc.enqueueStatefulSet(set) return } @@ -273,16 +263,11 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { // No controller should care about orphans being deleted. return } - if controllerRef.Kind != controllerKind.Kind { - // It's controlled by a different type of controller. + set := ssc.resolveControllerRef(pod.Namespace, controllerRef) + if set == nil { return } glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) - - set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) - if err != nil { - return - } ssc.enqueueStatefulSet(set) } @@ -327,6 +312,27 @@ func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.Sta return sets } +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { + return nil + } + set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if set.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return set +} + // enqueueStatefulSet enqueues the given statefulset in the work queue. func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) { key, err := controller.KeyFunc(obj)