From e4f67c81707851fe82aa0277f19620963613a926 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 16:27:20 -0800 Subject: [PATCH] StatefulSet: Use ControllerRefManager to adopt/orphan. --- pkg/controller/statefulset/BUILD | 2 + pkg/controller/statefulset/stateful_set.go | 37 +++++++-- .../statefulset/stateful_set_control.go | 1 + .../statefulset/stateful_set_test.go | 83 +++++++++++++++++++ .../statefulset/stateful_set_utils.go | 5 ++ 5 files changed, 121 insertions(+), 7 deletions(-) diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 655ef260403..29aa56d65d0 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -32,6 +32,8 @@ go_library( "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", + "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 896731037ee..8e7d64d6014 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -56,6 +57,8 @@ type StatefulSetController struct { // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface + // podControl is used for patching pods. + podControl controller.PodControlInterface // podLister is able to list/get pods from a shared informer's store podLister corelisters.PodLister // podListerSynced returns true if the pod shared informer has synced at least once @@ -95,6 +98,7 @@ func NewStatefulSetController( ), pvcListerSynced: pvcInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), + podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,13 +214,26 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { } } -// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset. -func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) { - sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) +// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// +// NOTE: Returned Pods are pointers to objects from the cache. +// If you need to modify one, you need to copy it first. +func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { + // List all pods to include the pods that don't match the selector anymore but + // has a ControllerRef pointing to this StatefulSet. + pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything()) if err != nil { - return []*v1.Pod{}, err + return nil, err } - return ssc.podLister.Pods(set.Namespace).List(sel) + + filter := func(pod *v1.Pod) bool { + // Only claim if it matches our StatefulSet name. Otherwise release/ignore. + return isMemberOf(set, pod) + } + + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind()) + return cm.ClaimPods(pods, filter) } // getStatefulSetForPod returns the StatefulSet managing the given pod. @@ -298,11 +315,17 @@ func (ssc *StatefulSetController) sync(key string) error { return nil } if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err)) + utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) return err } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) + // This is a non-transient error, so don't retry. + return nil + } - pods, err := ssc.getPodsForStatefulSet(set) + pods, err := ssc.getPodsForStatefulSet(set, selector) if err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 86645d07b22..6389d79117a 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -74,6 +74,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p // if the ordinal is greater than the number of replicas add it to the condemned list condemned = append(condemned, pods[i]) } + // If the ordinal could not be parsed (ord < 0), ignore the Pod. } // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 3d927e2609e..4069a80ac76 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "reflect" "sort" "testing" "time" @@ -345,6 +346,88 @@ func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { } } +func TestGetPodsForStatefulSetAdopt(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(5) + pod1 := newStatefulSetPod(set, 1) + // pod2 is an orphan with matching labels and name. + pod2 := newStatefulSetPod(set, 2) + pod2.OwnerReferences = nil + // pod3 has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.OwnerReferences = nil + pod3.Labels = nil + // pod4 has wrong name. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Name = "x" + pod4.Name + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + spc.podsIndexer.Add(pod4) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // pod2 should be claimed, pod3 and pod4 ignored + want := []string{pod1.Name, pod2.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) + } +} + +func TestGetPodsForStatefulSetRelease(t *testing.T) { + ssc, spc := newFakeStatefulSetController() + set := newStatefulSet(3) + pod1 := newStatefulSetPod(set, 1) + // pod2 is owned but has wrong name. + pod2 := newStatefulSetPod(set, 2) + pod2.Name = "x" + pod2.Name + // pod3 is owned but has wrong labels. + pod3 := newStatefulSetPod(set, 3) + pod3.Labels = nil + // pod4 is an orphan that doesn't match. + pod4 := newStatefulSetPod(set, 4) + pod4.OwnerReferences = nil + pod4.Labels = nil + + spc.podsIndexer.Add(pod1) + spc.podsIndexer.Add(pod2) + spc.podsIndexer.Add(pod3) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + pods, err := ssc.getPodsForStatefulSet(set, selector) + if err != nil { + t.Fatalf("getPodsForStatefulSet() error: %v", err) + } + var got []string + for _, pod := range pods { + got = append(got, pod.Name) + } + + // Expect only pod1 (pod2 and pod3 should be released, pod4 ignored). + want := []string{pod1.Name} + sort.Strings(got) + sort.Strings(want) + if !reflect.DeepEqual(got, want) { + t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want) + } +} + func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 231ef945675..aefa51b32ff 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -22,6 +22,7 @@ import ( "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -231,6 +232,10 @@ func isHealthy(pod *v1.Pod) bool { return isRunningAndReady(pod) && !isTerminated(pod) } +func getSSKind() schema.GroupVersionKind { + return apps.SchemeGroupVersion.WithKind("StatefulSet") +} + // newControllerRef returns an ControllerRef pointing to a given StatefulSet. func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference { isController := true