diff --git a/pkg/client/listers/apps/internalversion/statefulset_expansion.go b/pkg/client/listers/apps/internalversion/statefulset_expansion.go index 0b58cf6a817..0c54dbb6494 100644 --- a/pkg/client/listers/apps/internalversion/statefulset_expansion.go +++ b/pkg/client/listers/apps/internalversion/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *api.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet diff --git a/pkg/client/listers/apps/v1beta1/statefulset_expansion.go b/pkg/client/listers/apps/v1beta1/statefulset_expansion.go index e032ae227f3..2eea9b1015c 100644 --- a/pkg/client/listers/apps/v1beta1/statefulset_expansion.go +++ b/pkg/client/listers/apps/v1beta1/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index d3bc26473a0..0e16704ba05 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -51,8 +51,8 @@ type baseControllerRefManager struct { // claimObject tries to take ownership of an object for this controller. // // It will reconcile the following: -// * Adopt orphans if the selector matches. -// * Release owned objects if the selector no longer matches. +// * Adopt orphans if the match function returns true. +// * Release owned objects if the match function returns false. // // A non-nil error is returned if some form of reconciliation was attemped and // failed. Usually, controllers should try again later in case reconciliation @@ -63,14 +63,14 @@ type baseControllerRefManager struct { // own the object. // // No reconciliation will be attempted if the controller is being deleted. -func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release func(metav1.Object) error) (bool, error) { +func (m *baseControllerRefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { controllerRef := GetControllerOf(obj) if controllerRef != nil { if controllerRef.UID != m.controller.GetUID() { // Owned by someone else. Ignore. return false, nil } - if m.selector.Matches(labels.Set(obj.GetLabels())) { + if match(obj) { // We already own it and the selector matches. // Return true (successfully claimed) before checking deletion timestamp. // We're still allowed to claim things we already own while being deleted @@ -96,8 +96,7 @@ func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release } // It's an orphan. - if m.controller.GetDeletionTimestamp() != nil || - !m.selector.Matches(labels.Set(obj.GetLabels())) { + if m.controller.GetDeletionTimestamp() != nil || !match(obj) { // Ignore if we're being deleted or selector doesn't match. return false, nil } @@ -145,16 +144,32 @@ func NewPodControllerRefManager( // * Adopt orphans if the selector matches. // * Release owned objects if the selector no longer matches. // +// Optional: If one or more filters are specified, a Pod will only be claimed if +// all filters return true. +// // A non-nil error is returned if some form of reconciliation was attemped and // failed. Usually, controllers should try again later in case reconciliation // is still needed. // // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of Pods that you now own is returned. -func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { var claimed []*v1.Pod var errlist []error + match := func(obj metav1.Object) bool { + pod := obj.(*v1.Pod) + // Check selector first so filters only run on potentially matching Pods. + if !m.selector.Matches(labels.Set(pod.Labels)) { + return false + } + for _, filter := range filters { + if !filter(pod) { + return false + } + } + return true + } adopt := func(obj metav1.Object) error { return m.AdoptPod(obj.(*v1.Pod)) } @@ -163,7 +178,7 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { } for _, pod := range pods { - ok, err := m.claimObject(pod, adopt, release) + ok, err := m.claimObject(pod, match, adopt, release) if err != nil { errlist = append(errlist, err) continue @@ -265,6 +280,9 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep var claimed []*extensions.ReplicaSet var errlist []error + match := func(obj metav1.Object) bool { + return m.selector.Matches(labels.Set(obj.GetLabels())) + } adopt := func(obj metav1.Object) error { return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet)) } @@ -273,7 +291,7 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep } for _, rs := range sets { - ok, err := m.claimObject(rs, adopt, release) + ok, err := m.claimObject(rs, match, adopt, release) if err != nil { errlist = append(errlist, err) continue diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 655ef260403..0eb20aca1aa 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -32,6 +32,7 @@ go_library( "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 896731037ee..04fce3b72db 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -19,11 +19,11 @@ package statefulset import ( "fmt" "reflect" - "sort" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -49,6 +49,9 @@ const ( statefulSetResyncPeriod = 30 * time.Second ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet") + // StatefulSetController controls statefulsets. type StatefulSetController struct { // client interface @@ -56,6 +59,8 @@ type StatefulSetController struct { // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface + // podControl is used for patching pods. + podControl controller.PodControlInterface // podLister is able to list/get pods from a shared informer's store podLister corelisters.PodLister // podListerSynced returns true if the pod shared informer has synced at least once @@ -95,6 +100,7 @@ func NewStatefulSetController( ), pvcListerSynced: pvcInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), + podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -153,16 +159,38 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { // addPod adds the statefulset for the pod to the sync queue func (ssc *StatefulSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) - set := ssc.getStatefulSetForPod(pod) - if set == nil { + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + ssc.deletePod(pod) return } - ssc.enqueueStatefulSet(set) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + set := ssc.resolveControllerRef(pod.Namespace, controllerRef) + if set == nil { + return + } + glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) + ssc.enqueueStatefulSet(set) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + sets := ssc.getStatefulSetsForPod(pod) + if len(sets) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels) + for _, set := range sets { + ssc.enqueueStatefulSet(set) + } } // updatePod adds the statefulset for the current and old pods to the sync queue. -// If the labels of the pod didn't change, this method enqueues a single statefulset. func (ssc *StatefulSetController) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) @@ -171,15 +199,40 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - set := ssc.getStatefulSetForPod(curPod) - if set == nil { + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil { + ssc.enqueueStatefulSet(set) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef) + if set == nil { + return + } + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + ssc.enqueueStatefulSet(set) return } - ssc.enqueueStatefulSet(set) - // TODO will we need this going forward with controller ref impl? - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil { - ssc.enqueueStatefulSet(oldSet) + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + sets := ssc.getStatefulSetsForPod(curPod) + if len(sets) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + for _, set := range sets { + ssc.enqueueStatefulSet(set) } } } @@ -204,48 +257,80 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { return } } + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + set := ssc.resolveControllerRef(pod.Namespace, controllerRef) + if set == nil { + return + } glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) - if set := ssc.getStatefulSetForPod(pod); set != nil { - ssc.enqueueStatefulSet(set) - } + ssc.enqueueStatefulSet(set) } -// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset. -func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) { - sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) +// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// +// NOTE: Returned Pods are pointers to objects from the cache. +// If you need to modify one, you need to copy it first. +func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { + // List all pods to include the pods that don't match the selector anymore but + // has a ControllerRef pointing to this StatefulSet. + pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything()) if err != nil { - return []*v1.Pod{}, err + return nil, err } - return ssc.podLister.Pods(set.Namespace).List(sel) + + filter := func(pod *v1.Pod) bool { + // Only claim if it matches our StatefulSet name. Otherwise release/ignore. + return isMemberOf(set, pod) + } + + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind) + return cm.ClaimPods(pods, filter) } -// getStatefulSetForPod returns the StatefulSet managing the given pod. -func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet { +// getStatefulSetsForPod returns a list of StatefulSets that potentially match +// a given pod. +func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet { sets, err := ssc.setLister.GetPodStatefulSets(pod) if err != nil { - glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) return nil } // More than one set is selecting the same Pod if len(sets) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError( fmt.Errorf( "user error: more than one StatefulSet is selecting pods with labels: %+v", pod.Labels)) - // The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by - // name - sort.Sort(overlappingStatefulSets(sets)) - // return the first created set for which pod is a member - for i := range sets { - if isMemberOf(sets[i], pod) { - return sets[i] - } - } - glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) + } + return sets +} + +// resolveControllerRef returns the controller referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching controller +// of the corrrect Kind. +func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != controllerKind.Kind { return nil } - return sets[0] - + set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name) + if err != nil { + return nil + } + if set.UID != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return set } // enqueueStatefulSet enqueues the given statefulset in the work queue. @@ -298,11 +383,17 @@ func (ssc *StatefulSetController) sync(key string) error { return nil } if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err)) + utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) return err } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) + // This is a non-transient error, so don't retry. + return nil + } - pods, err := ssc.getPodsForStatefulSet(set) + pods, err := ssc.getPodsForStatefulSet(set, selector) if err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index d3ce799b2ef..6389d79117a 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -74,6 +74,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p // if the ordinal is greater than the number of replicas add it to the condemned list condemned = append(condemned, pods[i]) } + // If the ordinal could not be parsed (ord < 0), ignore the Pod. } // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod @@ -111,6 +112,12 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } } + // If the StatefulSet is being deleted, don't do anything other than updating + // status. + if set.DeletionTimestamp != nil { + return nil + } + // Examine each replica with respect to its ordinal for i := range replicas { // delete and recreate failed pods diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 2a74eeea89f..8a8fa67c5ff 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -214,6 +214,69 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { } } +func TestStatefulSetDeletionTimestamp(t *testing.T) { + set := newStatefulSet(5) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + + // Bring up a StatefulSet. + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + t.Errorf("failed to turn up StatefulSet : %s", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != 5 { + t.Error("failed to scale statefulset to 5 replicas") + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + sort.Sort(ascendingOrdinal(pods)) + + // Mark the StatefulSet as being deleted. + set.DeletionTimestamp = new(metav1.Time) + + // Delete the first pod. + spc.podsIndexer.Delete(pods[0]) + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + + // The StatefulSet should update its replica count, + // but not try to fix it. + if err := ssc.UpdateStatefulSet(set, pods); err != nil { + t.Errorf("failed to update StatefulSet : %s", err) + } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("error getting updated StatefulSet: %v", err) + } + if e, a := int32(4), set.Status.Replicas; e != a { + t.Errorf("expected to scale to %d, got %d", e, a) + } +} + func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 3d927e2609e..b996b6ca492 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -17,9 +17,9 @@ limitations under the License. package statefulset import ( + "reflect" "sort" "testing" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -176,23 +176,60 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) { } } -func TestStateSetControllerAddPod(t *testing.T) { +func TestStatefulSetControllerAddPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.addPod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.addPod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) + + ssc.addPod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) +} + +func TestStatefulSetControllerAddPodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + set3 := newStatefulSet(3) + set3.Name = "foo3" + set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"} + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + spc.setsIndexer.Add(set3) + + // Make pod an orphan. Expect matching sets to be queued. + pod.OwnerReferences = nil + ssc.addPod(pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } -func TestStateSetControllerAddPodNoSet(t *testing.T) { +func TestStatefulSetControllerAddPodNoSet(t *testing.T) { ssc, _ := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -206,19 +243,36 @@ func TestStateSetControllerAddPodNoSet(t *testing.T) { func TestStatefulSetControllerUpdatePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - prev := *pod - fakeResourceVersion(pod) - ssc.updatePod(&prev, pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + prev := *pod1 + fakeResourceVersion(pod1) + ssc.updatePod(&prev, pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + prev = *pod2 + fakeResourceVersion(pod2) + ssc.updatePod(&prev, pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } @@ -249,53 +303,106 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) { } } -func TestStatefulSetControllerUpdatePodWithNewLabels(t *testing.T) { +func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) + pod.OwnerReferences = nil set2 := newStatefulSet(3) set2.Name = "foo2" - set2.Spec.Selector.MatchLabels = map[string]string{"foo2": "bar2"} - set2.Spec.Template.Labels = map[string]string{"foo2": "bar2"} spc.setsIndexer.Add(set) spc.setsIndexer.Add(set2) clone := *pod clone.Labels = map[string]string{"foo2": "bar2"} fakeResourceVersion(&clone) - ssc.updatePod(pod, &clone) - key, done := ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } - key, done = ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) +} + +func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = pod2.OwnerReferences + fakeResourceVersion(&clone) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + +func TestStatefulSetControllerUpdatePodRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = nil + fakeResourceVersion(&clone) + ssc.updatePod(pod, &clone) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerDeletePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.deletePod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.deletePod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + ssc.deletePod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } + +func TestStatefulSetControllerDeletePodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + pod1.OwnerReferences = nil + ssc.deletePod(pod1) + if got, want := ssc.queue.Len(), 0; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) @@ -306,42 +413,108 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc.deletePod(tombstone) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") + t.Error("key is not a string") } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } -func TestStatefulSetControllerGetStatefulSetForPod(t *testing.T) { +func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - spc.podsIndexer.Add(pod) - if set := ssc.getStatefulSetForPod(pod); set == nil { - t.Error("Failed to get StatefulSet for Pod ") - } -} - -func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { - ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) + set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" - set3 := newStatefulSet(3) - set3.Name = "foo3" - set3.CreationTimestamp.Add(1 * time.Second) - spc.setsIndexer.Add(set3) + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) spc.setsIndexer.Add(set2) - spc.setsIndexer.Add(set) spc.podsIndexer.Add(pod) - if found := ssc.getStatefulSetForPod(pod); found == nil { - t.Error("Failed to get StatefulSet for Pod") - } else if found.Name != set.Name { - t.Errorf("Returned wrong StatefulSet %s for Pod", set.Name) + sets := ssc.getStatefulSetsForPod(pod) + if got, want := len(sets), 2; got != want { + t.Errorf("len(sets) = %v, want %v", got, want) + } +} + +func TestGetPodsForStatefulSetAdopt(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(5) + pod1 := newStatefulSetPod(set, 1) + // pod2 is an orphan with matching labels and name. + pod2 := newStatefulSetPod(set, 2) + pod2.OwnerReferences = nil + // pod3 has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.OwnerReferences = nil + pod3.Labels = nil + // pod4 has wrong name. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Name = "x" + pod4.Name + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + spc.podsIndexer.Add(pod4) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // pod2 should be claimed, pod3 and pod4 ignored + want := []string{pod1.Name, pod2.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) + } +} + +func TestGetPodsForStatefulSetRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + pod1 := newStatefulSetPod(set, 1) + // pod2 is owned but has wrong name. + pod2 := newStatefulSetPod(set, 2) + pod2.Name = "x" + pod2.Name + // pod3 is owned but has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.Labels = nil + // pod4 is an orphan that doesn't match. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Labels = nil + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // Expect only pod1 (pod2 and pod3 should be released, pod4 ignored). + want := []string{pod1.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) } } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 59811c1e6d4..672fd5b165a 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -21,6 +21,7 @@ import ( "regexp" "strconv" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -230,9 +231,23 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } +// newControllerRef returns an ControllerRef pointing to a given StatefulSet. +func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { + blockOwnerDeletion := true + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: set.Name, + UID: set.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, + } +} + // newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal. func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod { - pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, nil) + pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, newControllerRef(set)) pod.Name = getPodName(set, ordinal) updateIdentity(set, pod) updateStorage(set, pod) diff --git a/pkg/controller/statefulset/stateful_set_utils_test.go b/pkg/controller/statefulset/stateful_set_utils_test.go index b6e80473985..c53cbb10a6d 100644 --- a/pkg/controller/statefulset/stateful_set_utils_test.go +++ b/pkg/controller/statefulset/stateful_set_utils_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/controller" ) func TestGetParentNameAndOrdinal(t *testing.T) { @@ -271,6 +272,30 @@ func TestOverlappingStatefulSets(t *testing.T) { } } +func TestNewPodControllerRef(t *testing.T) { + set := newStatefulSet(1) + pod := newStatefulSetPod(set, 0) + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + t.Fatalf("No ControllerRef found on new pod") + } + if got, want := controllerRef.APIVersion, apps.SchemeGroupVersion.String(); got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) + } + if got, want := controllerRef.Kind, "StatefulSet"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) + } + if got, want := controllerRef.Name, set.Name; got != want { + t.Errorf("controllerRef.Name = %q, want %q", got, want) + } + if got, want := controllerRef.UID, set.UID; got != want { + t.Errorf("controllerRef.UID = %q, want %q", got, want) + } + if got, want := *controllerRef.Controller, true; got != want { + t.Errorf("controllerRef.Controller = %v, want %v", got, want) + } +} + func newPVC(name string) v1.PersistentVolumeClaim { return v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/registry/apps/petset/BUILD b/pkg/registry/apps/petset/BUILD index d6b9ba448ac..edf6162bcd0 100644 --- a/pkg/registry/apps/petset/BUILD +++ b/pkg/registry/apps/petset/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/validation/field", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/generic", + "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/storage", "//vendor:k8s.io/apiserver/pkg/storage/names", ], @@ -40,6 +41,7 @@ go_test( "//pkg/apis/apps:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apiserver/pkg/endpoints/request", + "//vendor:k8s.io/apiserver/pkg/registry/rest", ], ) diff --git a/pkg/registry/apps/petset/strategy.go b/pkg/registry/apps/petset/strategy.go index a60f8f7e284..4a61860baab 100644 --- a/pkg/registry/apps/petset/strategy.go +++ b/pkg/registry/apps/petset/strategy.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -42,6 +43,12 @@ type statefulSetStrategy struct { // Strategy is the default logic that applies when creating and updating Replication StatefulSet objects. var Strategy = statefulSetStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (statefulSetStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all StatefulSet' need to be within a namespace. func (statefulSetStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/apps/petset/strategy_test.go b/pkg/registry/apps/petset/strategy_test.go index 1690aa8ce96..99065690b09 100644 --- a/pkg/registry/apps/petset/strategy_test.go +++ b/pkg/registry/apps/petset/strategy_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/apps" ) @@ -62,7 +63,7 @@ func TestStatefulSetStrategy(t *testing.T) { } errs := Strategy.Validate(ctx, ps) if len(errs) != 0 { - t.Errorf("Unexpected error validating %v", errs) + t.Errorf("unexpected error validating %v", errs) } // Just Spec.Replicas is allowed to change @@ -77,14 +78,21 @@ func TestStatefulSetStrategy(t *testing.T) { Strategy.PrepareForUpdate(ctx, validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) != 0 { - t.Errorf("Updating spec.Replicas is allowed on a statefulset: %v", errs) + t.Errorf("updating spec.Replicas is allowed on a statefulset: %v", errs) } validPs.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"a": "bar"}} Strategy.PrepareForUpdate(ctx, validPs, ps) errs = Strategy.ValidateUpdate(ctx, validPs, ps) if len(errs) == 0 { - t.Errorf("Expected a validation error since updates are disallowed on statefulsets.") + t.Errorf("expected a validation error since updates are disallowed on statefulsets.") + } + + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) } } @@ -140,6 +148,6 @@ func TestStatefulSetStatusStrategy(t *testing.T) { } errs := StatusStrategy.ValidateUpdate(ctx, newPS, oldPS) if len(errs) != 0 { - t.Errorf("Unexpected error %v", errs) + t.Errorf("unexpected error %v", errs) } } diff --git a/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go b/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go index d34e894ac35..4473477e219 100644 --- a/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go +++ b/staging/src/k8s.io/client-go/listers/apps/v1beta1/statefulset_expansion.go @@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface { // StatefulSetNamespaeLister. type StatefulSetNamespaceListerExpansion interface{} -// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found. +// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching StatefulSets are found. func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) { var selector labels.Selector var ps *apps.StatefulSet diff --git a/test/e2e/framework/statefulset_utils.go b/test/e2e/framework/statefulset_utils.go index 1f08ba57471..6b4129692f4 100644 --- a/test/e2e/framework/statefulset_utils.go +++ b/test/e2e/framework/statefulset_utils.go @@ -408,7 +408,9 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) { } sst.WaitForStatus(&ss, 0) Logf("Deleting statefulset %v", ss.Name) - if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil { + // Use OrphanDependents=false so it's deleted synchronously. + // We already made sure the Pods are gone inside Scale(). + if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, &metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil { errList = append(errList, fmt.Sprintf("%v", err)) } }