diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 29aa56d65d0..0eb20aca1aa 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -33,7 +33,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 8e7d64d6014..2c430ae9228 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -19,7 +19,6 @@ package statefulset import ( "fmt" "reflect" - "sort" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -50,6 +49,9 @@ const ( statefulSetResyncPeriod = 30 * time.Second ) +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet") + // StatefulSetController controls statefulsets. type StatefulSetController struct { // client interface @@ -158,15 +160,36 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { func (ssc *StatefulSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) - set := ssc.getStatefulSetForPod(pod) - if set == nil { + + if pod.DeletionTimestamp != nil { + // on a restart of the controller manager, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + ssc.deletePod(pod) return } - ssc.enqueueStatefulSet(set) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) + return + } + + // Otherwise, it's an orphan. Get a list of all matching controllers and sync + // them to see if anyone wants to adopt it. + for _, set := range ssc.getStatefulSetsForPod(pod) { + ssc.enqueueStatefulSet(set) + } } // updatePod adds the statefulset for the current and old pods to the sync queue. -// If the labels of the pod didn't change, this method enqueues a single statefulset. func (ssc *StatefulSetController) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) @@ -175,15 +198,41 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - set := ssc.getStatefulSetForPod(curPod) - if set == nil { + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + set, err := ssc.setLister.StatefulSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + ssc.enqueueStatefulSet(set) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + set, err := ssc.setLister.StatefulSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) return } - ssc.enqueueStatefulSet(set) - // TODO will we need this going forward with controller ref impl? - if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { - if oldSet := ssc.getStatefulSetForPod(oldPod); oldSet != nil { - ssc.enqueueStatefulSet(oldSet) + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, set := range ssc.getStatefulSetsForPod(curPod) { + ssc.enqueueStatefulSet(set) } } } @@ -209,9 +258,22 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { } } glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) - if set := ssc.getStatefulSetForPod(pod); set != nil { - ssc.enqueueStatefulSet(set) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + set, err := ssc.setLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + ssc.enqueueStatefulSet(set) } // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. @@ -232,12 +294,13 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s return isMemberOf(set, pod) } - cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind()) + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind) return cm.ClaimPods(pods, filter) } -// getStatefulSetForPod returns the StatefulSet managing the given pod. -func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet { +// getStatefulSetsForPod returns a list of StatefulSets that potentially match +// a given pod. +func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet { sets, err := ssc.setLister.GetPodStatefulSets(pod) if err != nil { glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) @@ -245,24 +308,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef } // More than one set is selecting the same Pod if len(sets) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError( fmt.Errorf( "user error: more than one StatefulSet is selecting pods with labels: %+v", pod.Labels)) - // The timestamp sort should not be necessary because we will enforce the CreatedBy requirement by - // name - sort.Sort(overlappingStatefulSets(sets)) - // return the first created set for which pod is a member - for i := range sets { - if isMemberOf(sets[i], pod) { - return sets[i] - } - } - glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) - return nil } - return sets[0] - + return sets } // enqueueStatefulSet enqueues the given statefulset in the work queue. diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 4069a80ac76..b996b6ca492 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -20,7 +20,6 @@ import ( "reflect" "sort" "testing" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -177,23 +176,60 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) { } } -func TestStateSetControllerAddPod(t *testing.T) { +func TestStatefulSetControllerAddPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.addPod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.addPod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) + + ssc.addPod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + ssc.queue.Done(key) +} + +func TestStatefulSetControllerAddPodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + set3 := newStatefulSet(3) + set3.Name = "foo3" + set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"} + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + spc.setsIndexer.Add(set3) + + // Make pod an orphan. Expect matching sets to be queued. + pod.OwnerReferences = nil + ssc.addPod(pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } -func TestStateSetControllerAddPodNoSet(t *testing.T) { +func TestStatefulSetControllerAddPodNoSet(t *testing.T) { ssc, _ := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -207,19 +243,36 @@ func TestStateSetControllerAddPodNoSet(t *testing.T) { func TestStatefulSetControllerUpdatePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - prev := *pod - fakeResourceVersion(pod) - ssc.updatePod(&prev, pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + prev := *pod1 + fakeResourceVersion(pod1) + ssc.updatePod(&prev, pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + prev = *pod2 + fakeResourceVersion(pod2) + ssc.updatePod(&prev, pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } @@ -250,53 +303,106 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) { } } -func TestStatefulSetControllerUpdatePodWithNewLabels(t *testing.T) { +func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) + pod.OwnerReferences = nil set2 := newStatefulSet(3) set2.Name = "foo2" - set2.Spec.Selector.MatchLabels = map[string]string{"foo2": "bar2"} - set2.Spec.Template.Labels = map[string]string{"foo2": "bar2"} spc.setsIndexer.Add(set) spc.setsIndexer.Add(set2) clone := *pod clone.Labels = map[string]string{"foo2": "bar2"} fakeResourceVersion(&clone) - ssc.updatePod(pod, &clone) - key, done := ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } - key, done = ssc.queue.Get() - if key == nil || done { - t.Error("Failed to enqueue StatefulSet") - } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) +} + +func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = pod2.OwnerReferences + fakeResourceVersion(&clone) + ssc.updatePod(&clone, pod) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + +func TestStatefulSetControllerUpdatePodRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod := newStatefulSetPod(set, 0) + spc.setsIndexer.Add(set) + spc.setsIndexer.Add(set2) + clone := *pod + clone.OwnerReferences = nil + fakeResourceVersion(&clone) + ssc.updatePod(pod, &clone) + if got, want := ssc.queue.Len(), 2; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerDeletePod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - ssc.deletePod(pod) + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + pod2 := newStatefulSetPod(set2, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + ssc.deletePod(pod1) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") - } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) + } + + ssc.deletePod(pod2) + key, done = ssc.queue.Get() + if key == nil || done { + t.Error("failed to enqueue StatefulSet") + } else if key, ok := key.(string); !ok { + t.Error("key is not a string") + } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key { + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } + +func TestStatefulSetControllerDeletePodOrphan(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set1 := newStatefulSet(3) + set2 := newStatefulSet(3) + set2.Name = "foo2" + pod1 := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) + spc.setsIndexer.Add(set2) + + pod1.OwnerReferences = nil + ssc.deletePod(pod1) + if got, want := ssc.queue.Len(), 0; got != want { + t.Errorf("queue.Len() = %v, want %v", got, want) + } +} + func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) @@ -307,42 +413,26 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { ssc.deletePod(tombstone) key, done := ssc.queue.Get() if key == nil || done { - t.Error("Failed to enqueue StatefulSet") + t.Error("failed to enqueue StatefulSet") } else if key, ok := key.(string); !ok { - t.Error("Key is not a string") + t.Error("key is not a string") } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key { - t.Errorf("Expected StatefulSet key %s found %s", expectedKey, key) + t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } } -func TestStatefulSetControllerGetStatefulSetForPod(t *testing.T) { +func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - spc.setsIndexer.Add(set) - spc.podsIndexer.Add(pod) - if set := ssc.getStatefulSetForPod(pod); set == nil { - t.Error("Failed to get StatefulSet for Pod ") - } -} - -func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { - ssc, spc := newFakeStatefulSetController() - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) + set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" - set3 := newStatefulSet(3) - set3.Name = "foo3" - set3.CreationTimestamp.Add(1 * time.Second) - spc.setsIndexer.Add(set3) + pod := newStatefulSetPod(set1, 0) + spc.setsIndexer.Add(set1) spc.setsIndexer.Add(set2) - spc.setsIndexer.Add(set) spc.podsIndexer.Add(pod) - if found := ssc.getStatefulSetForPod(pod); found == nil { - t.Error("Failed to get StatefulSet for Pod") - } else if found.Name != set.Name { - t.Errorf("Returned wrong StatefulSet %s for Pod", set.Name) + sets := ssc.getStatefulSetsForPod(pod) + if got, want := len(sets), 2; got != want { + t.Errorf("len(sets) = %v, want %v", got, want) } } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index aefa51b32ff..89bca6a60fa 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -22,7 +22,6 @@ import ( "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -232,16 +231,12 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } -func getSSKind() schema.GroupVersionKind { - return apps.SchemeGroupVersion.WithKind("StatefulSet") -} - // newControllerRef returns an ControllerRef pointing to a given StatefulSet. func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { isController := true return &metav1.OwnerReference{ - APIVersion: apps.SchemeGroupVersion.String(), - Kind: "StatefulSet", + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: set.Name, UID: set.UID, Controller: &isController,