Merge pull request #42080 from enisoc/controller-ref-ss

Automatic merge from submit-queue (batch tested with PRs 42080, 41653, 42598, 42555)

StatefulSet: Respect ControllerRef

**What this PR does / why we need it**:

This is part of the completion of the [ControllerRef](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md) proposal. It brings StatefulSet into full compliance with ControllerRef. See the individual commit messages for details.

**Which issue this PR fixes**:

Fixes #36859

**Special notes for your reviewer**:

**Release note**:

```release-note
StatefulSet now respects ControllerRef to avoid fighting over Pods. At the time of upgrade, **you must not have StatefulSets with selectors that overlap** with any other controllers (such as ReplicaSets), or else [ownership of Pods may change](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#upgrading).
```
cc @erictune @kubernetes/sig-apps-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-03-06 17:16:10 -08:00 committed by GitHub
commit d50a59ec66
15 changed files with 547 additions and 129 deletions

View File

@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface {
// StatefulSetNamespaeLister.
type StatefulSetNamespaceListerExpansion interface{}
// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found.
// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod.
// Only the one specified in the Pod's ControllerRef will actually manage it.
// Returns an error only if no matching StatefulSets are found.
func (s *statefulSetLister) GetPodStatefulSets(pod *api.Pod) ([]*apps.StatefulSet, error) {
var selector labels.Selector
var ps *apps.StatefulSet

View File

@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface {
// StatefulSetNamespaeLister.
type StatefulSetNamespaceListerExpansion interface{}
// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found.
// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod.
// Only the one specified in the Pod's ControllerRef will actually manage it.
// Returns an error only if no matching StatefulSets are found.
func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) {
var selector labels.Selector
var ps *apps.StatefulSet

View File

@ -51,8 +51,8 @@ type baseControllerRefManager struct {
// claimObject tries to take ownership of an object for this controller.
//
// It will reconcile the following:
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
// * Adopt orphans if the match function returns true.
// * Release owned objects if the match function returns false.
//
// A non-nil error is returned if some form of reconciliation was attemped and
// failed. Usually, controllers should try again later in case reconciliation
@ -63,14 +63,14 @@ type baseControllerRefManager struct {
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release func(metav1.Object) error) (bool, error) {
func (m *baseControllerRefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
controllerRef := GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
if m.selector.Matches(labels.Set(obj.GetLabels())) {
if match(obj) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
@ -96,8 +96,7 @@ func (m *baseControllerRefManager) claimObject(obj metav1.Object, adopt, release
}
// It's an orphan.
if m.controller.GetDeletionTimestamp() != nil ||
!m.selector.Matches(labels.Set(obj.GetLabels())) {
if m.controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
@ -145,16 +144,32 @@ func NewPodControllerRefManager(
// * Adopt orphans if the selector matches.
// * Release owned objects if the selector no longer matches.
//
// Optional: If one or more filters are specified, a Pod will only be claimed if
// all filters return true.
//
// A non-nil error is returned if some form of reconciliation was attemped and
// failed. Usually, controllers should try again later in case reconciliation
// is still needed.
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Pods that you now own is returned.
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) {
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
}
@ -163,7 +178,7 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) {
}
for _, pod := range pods {
ok, err := m.claimObject(pod, adopt, release)
ok, err := m.claimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
@ -265,6 +280,9 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep
var claimed []*extensions.ReplicaSet
var errlist []error
match := func(obj metav1.Object) bool {
return m.selector.Matches(labels.Set(obj.GetLabels()))
}
adopt := func(obj metav1.Object) error {
return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet))
}
@ -273,7 +291,7 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.Rep
}
for _, rs := range sets {
ok, err := m.claimObject(rs, adopt, release)
ok, err := m.claimObject(rs, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue

View File

@ -32,6 +32,7 @@ go_library(
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -19,11 +19,11 @@ package statefulset
import (
"fmt"
"reflect"
"sort"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -49,6 +49,9 @@ const (
statefulSetResyncPeriod = 30 * time.Second
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
// StatefulSetController controls statefulsets.
type StatefulSetController struct {
// client interface
@ -56,6 +59,8 @@ type StatefulSetController struct {
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podControl is used for patching pods.
podControl controller.PodControlInterface
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
@ -95,6 +100,7 @@ func NewStatefulSetController(
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -153,16 +159,38 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
// addPod adds the statefulset for the pod to the sync queue
func (ssc *StatefulSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
set := ssc.getStatefulSetForPod(pod)
if set == nil {
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
ssc.deletePod(pod)
return
}
ssc.enqueueStatefulSet(set)
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
if set == nil {
return
}
glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
ssc.enqueueStatefulSet(set)
return
}
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
sets := ssc.getStatefulSetsForPod(pod)
if len(sets) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
for _, set := range sets {
ssc.enqueueStatefulSet(set)
}
}
// updatePod adds the statefulset for the current and old pods to the sync queue.
// If the labels of the pod didn't change, this method enqueues a single statefulset.
func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
@ -171,15 +199,40 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
// Two different versions of the same pod will always have different RVs.
return
}
set := ssc.getStatefulSetForPod(curPod)
if set == nil {
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
ssc.enqueueStatefulSet(set)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
if set == nil {
return
}
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
ssc.enqueueStatefulSet(set)
return
}
ssc.enqueueStatefulSet(set)
// TODO will we need this going forward with controller ref impl?
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil {
ssc.enqueueStatefulSet(oldSet)
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
sets := ssc.getStatefulSetsForPod(curPod)
if len(sets) == 0 {
return
}
glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, set := range sets {
ssc.enqueueStatefulSet(set)
}
}
}
@ -204,48 +257,80 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) {
return
}
}
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
if set == nil {
return
}
glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
if set := ssc.getStatefulSetForPod(pod); set != nil {
ssc.enqueueStatefulSet(set)
}
ssc.enqueueStatefulSet(set)
}
// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset.
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) {
sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
// It also reconciles ControllerRef by adopting/orphaning.
//
// NOTE: Returned Pods are pointers to objects from the cache.
// If you need to modify one, you need to copy it first.
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
// List all pods to include the pods that don't match the selector anymore but
// has a ControllerRef pointing to this StatefulSet.
pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
if err != nil {
return []*v1.Pod{}, err
return nil, err
}
return ssc.podLister.Pods(set.Namespace).List(sel)
filter := func(pod *v1.Pod) bool {
// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
return isMemberOf(set, pod)
}
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind)
return cm.ClaimPods(pods, filter)
}
// getStatefulSetForPod returns the StatefulSet managing the given pod.
func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
// getStatefulSetsForPod returns a list of StatefulSets that potentially match
// a given pod.
func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
sets, err := ssc.setLister.GetPodStatefulSets(pod)
if err != nil {
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil
}
// More than one set is selecting the same Pod
if len(sets) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(
fmt.Errorf(
"user error: more than one StatefulSet is selecting pods with labels: %+v",
pod.Labels))
// The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by
// name
sort.Sort(overlappingStatefulSets(sets))
// return the first created set for which pod is a member
for i := range sets {
if isMemberOf(sets[i], pod) {
return sets[i]
}
}
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
}
return sets
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the corrrect Kind.
func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
return sets[0]
set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if set.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return set
}
// enqueueStatefulSet enqueues the given statefulset in the work queue.
@ -298,11 +383,17 @@ func (ssc *StatefulSetController) sync(key string) error {
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err))
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}
pods, err := ssc.getPodsForStatefulSet(set)
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}

View File

@ -74,6 +74,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// if the ordinal is greater than the number of replicas add it to the condemned list
condemned = append(condemned, pods[i])
}
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod
@ -111,6 +112,12 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
}
}
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return nil
}
// Examine each replica with respect to its ordinal
for i := range replicas {
// delete and recreate failed pods

View File

@ -214,6 +214,69 @@ func TestStatefulSetControlReplacesPods(t *testing.T) {
}
}
func TestStatefulSetDeletionTimestamp(t *testing.T) {
set := newStatefulSet(5)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
// Bring up a StatefulSet.
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("failed to turn up StatefulSet : %s", err)
}
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 5 {
t.Error("failed to scale statefulset to 5 replicas")
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
sort.Sort(ascendingOrdinal(pods))
// Mark the StatefulSet as being deleted.
set.DeletionTimestamp = new(metav1.Time)
// Delete the first pod.
spc.podsIndexer.Delete(pods[0])
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
// The StatefulSet should update its replica count,
// but not try to fix it.
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("failed to update StatefulSet : %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("error getting updated StatefulSet: %v", err)
}
if e, a := int32(4), set.Status.Replicas; e != a {
t.Errorf("expected to scale to %d, got %d", e, a)
}
}
func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())

View File

@ -17,9 +17,9 @@ limitations under the License.
package statefulset
import (
"reflect"
"sort"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -176,23 +176,60 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
}
}
func TestStateSetControllerAddPod(t *testing.T) {
func TestStatefulSetControllerAddPod(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
ssc.addPod(pod)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
ssc.addPod(pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
ssc.queue.Done(key)
ssc.addPod(pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
ssc.queue.Done(key)
}
func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
set3 := newStatefulSet(3)
set3.Name = "foo3"
set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"}
pod := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
spc.setsIndexer.Add(set3)
// Make pod an orphan. Expect matching sets to be queued.
pod.OwnerReferences = nil
ssc.addPod(pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStateSetControllerAddPodNoSet(t *testing.T) {
func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
ssc, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -206,19 +243,36 @@ func TestStateSetControllerAddPodNoSet(t *testing.T) {
func TestStatefulSetControllerUpdatePod(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
prev := *pod
fakeResourceVersion(pod)
ssc.updatePod(&prev, pod)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
prev := *pod1
fakeResourceVersion(pod1)
ssc.updatePod(&prev, pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
prev = *pod2
fakeResourceVersion(pod2)
ssc.updatePod(&prev, pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
}
@ -249,53 +303,106 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
}
}
func TestStatefulSetControllerUpdatePodWithNewLabels(t *testing.T) {
func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
pod.OwnerReferences = nil
set2 := newStatefulSet(3)
set2.Name = "foo2"
set2.Spec.Selector.MatchLabels = map[string]string{"foo2": "bar2"}
set2.Spec.Template.Labels = map[string]string{"foo2": "bar2"}
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
clone := *pod
clone.Labels = map[string]string{"foo2": "bar2"}
fakeResourceVersion(&clone)
ssc.updatePod(pod, &clone)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
ssc.updatePod(&clone, pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
}
func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod := newStatefulSetPod(set, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
clone := *pod
clone.OwnerReferences = pod2.OwnerReferences
fakeResourceVersion(&clone)
ssc.updatePod(&clone, pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
clone := *pod
clone.OwnerReferences = nil
fakeResourceVersion(&clone)
ssc.updatePod(pod, &clone)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerDeletePod(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
ssc.deletePod(pod)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
ssc.deletePod(pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
ssc.deletePod(pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
}
func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
pod1.OwnerReferences = nil
ssc.deletePod(pod1)
if got, want := ssc.queue.Len(), 0; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
@ -306,42 +413,108 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
ssc.deletePod(tombstone)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("Failed to enqueue StatefulSet")
t.Error("failed to enqueue StatefulSet")
} else if key, ok := key.(string); !ok {
t.Error("Key is not a string")
t.Error("key is not a string")
} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key)
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
}
func TestStatefulSetControllerGetStatefulSetForPod(t *testing.T) {
func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
spc.podsIndexer.Add(pod)
if set := ssc.getStatefulSetForPod(pod); set == nil {
t.Error("Failed to get StatefulSet for Pod ")
}
}
func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
set3 := newStatefulSet(3)
set3.Name = "foo3"
set3.CreationTimestamp.Add(1 * time.Second)
spc.setsIndexer.Add(set3)
pod := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
spc.setsIndexer.Add(set)
spc.podsIndexer.Add(pod)
if found := ssc.getStatefulSetForPod(pod); found == nil {
t.Error("Failed to get StatefulSet for Pod")
} else if found.Name != set.Name {
t.Errorf("Returned wrong StatefulSet %s for Pod", set.Name)
sets := ssc.getStatefulSetsForPod(pod)
if got, want := len(sets), 2; got != want {
t.Errorf("len(sets) = %v, want %v", got, want)
}
}
func TestGetPodsForStatefulSetAdopt(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(5)
pod1 := newStatefulSetPod(set, 1)
// pod2 is an orphan with matching labels and name.
pod2 := newStatefulSetPod(set, 2)
pod2.OwnerReferences = nil
// pod3 has wrong labels.
pod3 := newStatefulSetPod(set, 3)
pod3.OwnerReferences = nil
pod3.Labels = nil
// pod4 has wrong name.
pod4 := newStatefulSetPod(set, 4)
pod4.OwnerReferences = nil
pod4.Name = "x" + pod4.Name
spc.podsIndexer.Add(pod1)
spc.podsIndexer.Add(pod2)
spc.podsIndexer.Add(pod3)
spc.podsIndexer.Add(pod4)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
t.Fatalf("getPodsForStatefulSet() error: %v", err)
}
var got []string
for _, pod := range pods {
got = append(got, pod.Name)
}
// pod2 should be claimed, pod3 and pod4 ignored
want := []string{pod1.Name, pod2.Name}
sort.Strings(got)
sort.Strings(want)
if !reflect.DeepEqual(got, want) {
t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
}
}
func TestGetPodsForStatefulSetRelease(t *testing.T) {
ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3)
pod1 := newStatefulSetPod(set, 1)
// pod2 is owned but has wrong name.
pod2 := newStatefulSetPod(set, 2)
pod2.Name = "x" + pod2.Name
// pod3 is owned but has wrong labels.
pod3 := newStatefulSetPod(set, 3)
pod3.Labels = nil
// pod4 is an orphan that doesn't match.
pod4 := newStatefulSetPod(set, 4)
pod4.OwnerReferences = nil
pod4.Labels = nil
spc.podsIndexer.Add(pod1)
spc.podsIndexer.Add(pod2)
spc.podsIndexer.Add(pod3)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
t.Fatalf("getPodsForStatefulSet() error: %v", err)
}
var got []string
for _, pod := range pods {
got = append(got, pod.Name)
}
// Expect only pod1 (pod2 and pod3 should be released, pod4 ignored).
want := []string{pod1.Name}
sort.Strings(got)
sort.Strings(want)
if !reflect.DeepEqual(got, want) {
t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
}
}

View File

@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1"
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
@ -230,9 +231,23 @@ func isHealthy(pod *v1.Pod) bool {
return isRunningAndReady(pod) && !isTerminated(pod)
}
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
blockOwnerDeletion := true
isController := true
return &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: set.Name,
UID: set.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
}
}
// newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, nil)
pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, newControllerRef(set))
pod.Name = getPodName(set, ordinal)
updateIdentity(set, pod)
updateStorage(set, pod)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/controller"
)
func TestGetParentNameAndOrdinal(t *testing.T) {
@ -271,6 +272,30 @@ func TestOverlappingStatefulSets(t *testing.T) {
}
}
func TestNewPodControllerRef(t *testing.T) {
set := newStatefulSet(1)
pod := newStatefulSetPod(set, 0)
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
t.Fatalf("No ControllerRef found on new pod")
}
if got, want := controllerRef.APIVersion, apps.SchemeGroupVersion.String(); got != want {
t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
}
if got, want := controllerRef.Kind, "StatefulSet"; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if got, want := controllerRef.Name, set.Name; got != want {
t.Errorf("controllerRef.Name = %q, want %q", got, want)
}
if got, want := controllerRef.UID, set.UID; got != want {
t.Errorf("controllerRef.UID = %q, want %q", got, want)
}
if got, want := *controllerRef.Controller, true; got != want {
t.Errorf("controllerRef.Controller = %v, want %v", got, want)
}
}
func newPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{

View File

@ -25,6 +25,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/util/validation/field",
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
"//vendor:k8s.io/apiserver/pkg/registry/generic",
"//vendor:k8s.io/apiserver/pkg/registry/rest",
"//vendor:k8s.io/apiserver/pkg/storage",
"//vendor:k8s.io/apiserver/pkg/storage/names",
],
@ -40,6 +41,7 @@ go_test(
"//pkg/apis/apps:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
"//vendor:k8s.io/apiserver/pkg/registry/rest",
],
)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/kubernetes/pkg/api"
@ -42,6 +43,12 @@ type statefulSetStrategy struct {
// Strategy is the default logic that applies when creating and updating Replication StatefulSet objects.
var Strategy = statefulSetStrategy{api.Scheme, names.SimpleNameGenerator}
// DefaultGarbageCollectionPolicy returns Orphan because that was the default
// behavior before the server-side garbage collection was implemented.
func (statefulSetStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy {
return rest.OrphanDependents
}
// NamespaceScoped returns true because all StatefulSet' need to be within a namespace.
func (statefulSetStrategy) NamespaceScoped() bool {
return true

View File

@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/apps"
)
@ -62,7 +63,7 @@ func TestStatefulSetStrategy(t *testing.T) {
}
errs := Strategy.Validate(ctx, ps)
if len(errs) != 0 {
t.Errorf("Unexpected error validating %v", errs)
t.Errorf("unexpected error validating %v", errs)
}
// Just Spec.Replicas is allowed to change
@ -77,14 +78,21 @@ func TestStatefulSetStrategy(t *testing.T) {
Strategy.PrepareForUpdate(ctx, validPs, ps)
errs = Strategy.ValidateUpdate(ctx, validPs, ps)
if len(errs) != 0 {
t.Errorf("Updating spec.Replicas is allowed on a statefulset: %v", errs)
t.Errorf("updating spec.Replicas is allowed on a statefulset: %v", errs)
}
validPs.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"a": "bar"}}
Strategy.PrepareForUpdate(ctx, validPs, ps)
errs = Strategy.ValidateUpdate(ctx, validPs, ps)
if len(errs) == 0 {
t.Errorf("Expected a validation error since updates are disallowed on statefulsets.")
t.Errorf("expected a validation error since updates are disallowed on statefulsets.")
}
// Make sure we correctly implement the interface.
// Otherwise a typo could silently change the default.
var gcds rest.GarbageCollectionDeleteStrategy = Strategy
if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want {
t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want)
}
}
@ -140,6 +148,6 @@ func TestStatefulSetStatusStrategy(t *testing.T) {
}
errs := StatusStrategy.ValidateUpdate(ctx, newPS, oldPS)
if len(errs) != 0 {
t.Errorf("Unexpected error %v", errs)
t.Errorf("unexpected error %v", errs)
}
}

View File

@ -35,7 +35,9 @@ type StatefulSetListerExpansion interface {
// StatefulSetNamespaeLister.
type StatefulSetNamespaceListerExpansion interface{}
// GetPodStatefulSets returns a list of StatefulSets managing a pod. Returns an error only if no matching StatefulSets are found.
// GetPodStatefulSets returns a list of StatefulSets that potentially match a pod.
// Only the one specified in the Pod's ControllerRef will actually manage it.
// Returns an error only if no matching StatefulSets are found.
func (s *statefulSetLister) GetPodStatefulSets(pod *v1.Pod) ([]*apps.StatefulSet, error) {
var selector labels.Selector
var ps *apps.StatefulSet

View File

@ -408,7 +408,9 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
}
sst.WaitForStatus(&ss, 0)
Logf("Deleting statefulset %v", ss.Name)
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, nil); err != nil {
// Use OrphanDependents=false so it's deleted synchronously.
// We already made sure the Pods are gone inside Scale().
if err := c.Apps().StatefulSets(ss.Namespace).Delete(ss.Name, &metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil {
errList = append(errList, fmt.Sprintf("%v", err))
}
}